Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handling workers with expiring allocation requests #122

Closed
wgustafson opened this issue Aug 10, 2018 · 83 comments · Fixed by #481 · May be fixed by dask/distributed#2844
Closed

Handling workers with expiring allocation requests #122

wgustafson opened this issue Aug 10, 2018 · 83 comments · Fixed by #481 · May be fixed by dask/distributed#2844
Labels
bug Something isn't working documentation Documentation-related enhancement New feature or request help wanted Extra attention is needed
Milestone

Comments

@wgustafson
Copy link
Contributor

I am trying to figure out how to handle the case of dask workers getting bumped from a cluster due to their requested allocation time expiring. From the intro YouTube video at https://www.youtube.com/watch?v=FXsgmwpRExM, it sounds like dask-jobqueue should detect when a worker expires and automatically start a replacement, which is what I want. However, my testing on DOE's edison computer at NERSC is not getting that behavior. If it matters, edison uses SLURM.

I have tried setting up my cluster two ways and both behave the same. I start a worker that uses dask.delayed to do a bunch of embarrassingly parallel tasks, the server spawns one worker, that worker does the first task or two, the worker expires, the server seems to hang, and nothing else happens.

The first approach I used to setup the cluster was with "scale":

    cluster = SLURMCluster(cores=1, processes=1)  # need all the memory for one task
    cluster.scale(1)  # testing with as simple as I can get, cycling 1 worker
    client = Client(cluster, timeout='45s')

@josephhardinee suggested a 2nd approach using "adapt" instead:

    cluster = SLURMCluster(cores=1, processes=1)  # need all the memory for one task
    cluster.adapt(minimum=1, maximum=1)  # trying adapt instead of scale
    client = Client(cluster, timeout='45s')

The dask-worker.err log concludes with:

slurmstepd: error: *** JOB 10234215 ON nid01242 CANCELLED AT 2018-08-10T13:25:30 DUE TO TIME LIMIT ***
distributed.dask_worker - INFO - Exiting on signal 15
distributed.nanny - INFO - Closing Nanny at 'tcp://10.128.4.227:35634'
distributed.dask_worker - INFO - Exiting on signal 15
distributed.dask_worker - INFO - End worker
distributed.process - WARNING - reaping stray process <ForkServerProcess(ForkServerProcess-1, started daemon)>

Am I expecting more from dask-jobqueue than I should? Or, is this a bug in my implementation or in dask.distributed of dask-jobqueue?

Thanks,
Bill

@guillaumeeb
Copy link
Member

Thanks for this really interesting test! I haven't encounter this yet, as I am currently only using dask for interactive analysis, and usually computations I launch have enough workers to end in a few minutes maximum.

However, I believe that, at least with adaptive cluster, for long running processes, workers should be restarted when allocation time expires. I will certainly need this in the near future. Currently, this is clearly not the case for non adaptive cluster. I cannot see anything in the code that would restart workers when a job ends. For adaptive, I don't know enough dask distributed code base yet, but the answer is probably there. Hopefully @mrocklin can answer this more easily.

I will try to reproduce your use case in my system newt week.

In any case, if this is not a bug of adaptive or our use of it, I think we should propose a solution here for having workers automatically restarted. Any advice @jhamman @mrocklin @lesteve ?

@mrocklin
Copy link
Member

Am I expecting more from dask-jobqueue than I should?

What you're describing seems like it's in scope for dask-jobqueue, but it's not currently implemented. This could be resolved by recording a set desired number of workers and re-scaling whenever a worker goes away, which could probably be handled by the remove_worker method of the JobQueuePlugin(SchedulerPlugin) class.

@guillaumeeb
Copy link
Member

but it's not currently implemented

So I take it adaptive clusters in distributed doesn't handle this either? Shouldn't it?

which could probably be handled by the remove_worker method of the JobQueuePlugin(SchedulerPlugin)

I had that in mind too, I'd be happy to work on it.

@mrocklin
Copy link
Member

mrocklin commented Aug 13, 2018 via email

@mrocklin
Copy link
Member

mrocklin commented Aug 13, 2018 via email

@guillaumeeb
Copy link
Member

So i've tried that with and without adaptive. As expected, without adaptive, once the worker is killed, nothing happens anymore.

With adaptive scaling, in my setup, a worker is restarted every time the previous one dies. Cool, that's what I want.

However, I'm still running into a difficult issue. Here is the code I use for some background:

cluster.adapt(minimum=1, maximum=1)
client = Client(cluster)
import time
def my_slow_task(i):
    time.sleep(10)
    return i**2
results = client.gather(client.map(my_slow_task, range(10)))

The problem is, the worker is keeping all results into its memory for sending them all at once. But it gets killed half way every time. So we end up after several tries with the following exception:

---------------------------------------------------------------------------
KilledWorker                              Traceback (most recent call last)
<ipython-input-6-69e1e4e9d452> in <module>()
      1 futures = client.map(my_slow_task, range(10))
----> 2 results = client.gather(futures)
      3 results

/work/ADM/hpc/eynardbg/Outils/miniconda3/envs/dask_dev/lib/python3.5/site-packages/distributed/client.py in gather(self, futures, errors, maxsize, direct, asynchronous)
   1560             return self.sync(self._gather, futures, errors=errors,
   1561                              direct=direct, local_worker=local_worker,
-> 1562                              asynchronous=asynchronous)
   1563 
   1564     @gen.coroutine

/work/ADM/hpc/eynardbg/Outils/miniconda3/envs/dask_dev/lib/python3.5/site-packages/distributed/client.py in sync(self, func, *args, **kwargs)
    650             return future
    651         else:
--> 652             return sync(self.loop, func, *args, **kwargs)
    653 
    654     def __repr__(self):

/work/ADM/hpc/eynardbg/Outils/miniconda3/envs/dask_dev/lib/python3.5/site-packages/distributed/utils.py in sync(loop, func, *args, **kwargs)
    273             e.wait(10)
    274     if error[0]:
--> 275         six.reraise(*error[0])
    276     else:
    277         return result[0]

/work/ADM/hpc/eynardbg/Outils/miniconda3/envs/dask_dev/lib/python3.5/site-packages/six.py in reraise(tp, value, tb)
    691             if value.__traceback__ is not tb:
    692                 raise value.with_traceback(tb)
--> 693             raise value
    694         finally:
    695             value = None

/work/ADM/hpc/eynardbg/Outils/miniconda3/envs/dask_dev/lib/python3.5/site-packages/distributed/utils.py in f()
    258             yield gen.moment
    259             thread_state.asynchronous = True
--> 260             result[0] = yield make_coro()
    261         except Exception as exc:
    262             error[0] = sys.exc_info()

/work/ADM/hpc/eynardbg/Outils/miniconda3/envs/dask_dev/lib/python3.5/site-packages/tornado/gen.py in run(self)
   1131 
   1132                     try:
-> 1133                         value = future.result()
   1134                     except Exception:
   1135                         self.had_exception = True

/work/ADM/hpc/eynardbg/Outils/miniconda3/envs/dask_dev/lib/python3.5/asyncio/futures.py in result(self)
    292             self._tb_logger = None
    293         if self._exception is not None:
--> 294             raise self._exception
    295         return self._result
    296 

/work/ADM/hpc/eynardbg/Outils/miniconda3/envs/dask_dev/lib/python3.5/site-packages/tornado/gen.py in run(self)
   1139                     if exc_info is not None:
   1140                         try:
-> 1141                             yielded = self.gen.throw(*exc_info)
   1142                         finally:
   1143                             # Break up a reference to itself

/work/ADM/hpc/eynardbg/Outils/miniconda3/envs/dask_dev/lib/python3.5/site-packages/distributed/client.py in _gather(self, futures, errors, direct, local_worker)
   1437                             six.reraise(type(exception),
   1438                                         exception,
-> 1439                                         traceback)
   1440                     if errors == 'skip':
   1441                         bad_keys.add(key)

/work/ADM/hpc/eynardbg/Outils/miniconda3/envs/dask_dev/lib/python3.5/site-packages/six.py in reraise(tp, value, tb)
    691             if value.__traceback__ is not tb:
    692                 raise value.with_traceback(tb)
--> 693             raise value
    694         finally:
    695             value = None

KilledWorker: ('my_slow_task-f7d3b870b0218db300f66ed70fe3fba2', 'tcp://10.135.36.29:44953')

@mrocklin
Copy link
Member

mrocklin commented Aug 13, 2018 via email

@josephhardinee
Copy link
Contributor

Out of curiosity, how does that work? I assume when a new worker needs to be started, a new jobscript has to be submitted. I assume that would need to be handled by jobqueue and not distributed?

@guillaumeeb
Copy link
Member

I don't exactly know how adaptive cluster is implemented in distributed, but the idea is that something is watching for dask cluster load, and calling scale method accordingly, for example if there are a lot more pending tasks than available workers.

Considering the issue here:

  • adaptive is already doing the first identified need: insuring there is always a minimum number of workers alive. Is it still worth implementing that as a dask-jonqueue default?
  • Second thing is that users should not submit graphs that cannot finish within the job allocated time and the requested number of workers. Submitting several graphs should be fine though (so doing batches if needed). Is this correct? Should we add this in the docs somehow?

@mrocklin
Copy link
Member

FWIW the adaptive implementation is only 300 lines of code. It might be a good evening read if anyone is interested:

https://github.com/dask/distributed/blob/master/distributed/deploy/adaptive.py

adaptive is already doing the first identified need: insuring there is always a minimum number of workers alive. Is it still worth implementing that as a dask-jonqueue default?

I don't know. @jhamman might have thoughts. Alternatively maybe there is some way to change Adaptive to maintain a set target rather than move that target around.

Second thing is that users should not submit graphs that cannot finish within the job allocated time and the requested number of workers. Submitting several graphs should be fine though (so doing batches if needed). Is this correct? Should we add this in the docs somehow?

It should be possible to run a single large graph with an evolving herd of workers, even if the workers don't live very long.

@jhamman
Copy link
Member

jhamman commented Aug 13, 2018

Alternatively maybe there is some way to change Adaptive to maintain a set target rather than move that target around.

I think this is necessary to do this correctly. Ideally we could have a Cluster.target_workers property. This property would be updated by both Cluster.scale() and Adaptive. My recent attempts to improve the adaptive scaling in jobqueue in #97 have highlighted the need for this.

Another thing that would be worth discussing is the ability for Adaptive to query the Cluster for pending workers. This obviously comes up in jobqueue when there are jobs in a queue but I image this will also be useful to other dask deploy managers (dask-kubernetes, etc.).

@wgustafson
Copy link
Contributor Author

Sounds like you have a pretty good idea of my workflow scenario and some initial ideas on what might be needed to enable it to work predictably. Thanks for looking into this.

From @guillaumeeb it looks like there might be something going on with NERSC's edison computer compared to his cluster since I was not seeing any restarts, whereas he was getting restarts but with data passing issues. If you would like me to run any tests on edison, let me know.

@guillaumeeb
Copy link
Member

@wgustafson, it could still be useful if you could give us some simplified version of the code your trying to run, just to be sure.

For information, I've performed my tests with a dask-jobqueue installed from one of the latest commit in master branch, were you using this to, or did you just pip (or conda) installed the latest released package?

@jhamman just to be sure, appart from fixing some edge case issues detected with #97, you believe it will be a good idea to have automatic restart of finished jobs by default?

It should be possible to run a single large graph with an evolving herd of workers, even if the workers don't live very long.

@mrocklin from my recent experiment here, I cannot see how it could be possible for now. Are in memory data backed up or transmitted in some way between workers? The simple workflow of submitting a group of independent tasks seems to show that you need to have at least workers that last enough time to process nb_tasks / nb_worker tasks. But I did not try with several workers, so this is just a guess.

@jhamman
Copy link
Member

jhamman commented Aug 13, 2018

@jhamman just to be sure, appart from fixing some edge case issues detected with #97, you believe it will be a good idea to have automatic restart of finished jobs by default?

I was thinking this would only work when using Adaptive. I think a non-adaptive job that dies due to a wall-clock expiration should stay dead.

@guillaumeeb
Copy link
Member

Yep, I agree with you @jhamman, no reason to implement a more complex default behavior when most of the time this is not needed (and probably not what we want), and also when there is already a complex mechanism that answers the need.

So this works right now with Adaptive, and you've also opened #112 to fix the edge cases.

Another thing that would be worth discussing is the ability for Adaptive to query the Cluster for pending workers.

I let you open another issue for that part, and if you can provide some advantages it would have.

Now @wgustafson, lets try to solve your adaptive problem, and also to see if your workflow is compatible with dying and restarting workers.

@josephhardinee
Copy link
Contributor

I assume to start a new worker would require a job script be submitted to the scheduler (On systems that are not exclusive allocations). This probably has to be handled by jobqueueand I'm wondering what the mechanism for transferring that information between distributed's Adaptive and jobqueue is? I didn't notice an explicit mechanism for this to happen.

@wgustafson
Copy link
Contributor Author

I obtained dask-jobqueue via "conda install dask-jobqueue -c conda-forge" following the docs. The versions I have for my software stack are

  • Python 3.6.5 w/ iPython 6.4.0 (via conda)
  • dask and dask-core 0.18.2
  • dask-jobqueue 0.3.0

I have started tinkering with a simplified script based on the my_slow_task script above to see how that behaves on edison. Hopefully, later today I will get time to verify it acts the same.

@mrocklin
Copy link
Member

@josephhardinee

def _calls(self, cmds, **kwargs):
""" Call a command using subprocess.communicate
This centralizes calls out to the command line, providing consistent outputs, logging, and an opportunity
to go asynchronous in the future
Parameters
----------
cmd: List(List(str))
A list of commands, each of which is a list of strings to hand to subprocess.communicate
Examples
--------
>>> self._calls([['ls'], ['ls', '/foo']])
Returns
-------
The stdout result as a string
Also logs any stderr information
"""
logger.debug("Submitting the following calls to command line")
procs = []
for cmd in cmds:
logger.debug(' '.join(cmd))
procs.append(subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, **kwargs))
result = []
for proc in procs:
out, err = proc.communicate()
if err:
logger.error(err.decode())
result.append(out)
return result

@wgustafson
Copy link
Contributor Author

wgustafson commented Aug 14, 2018

OK, I've tested code similar to that from @guillaumeeb where he reported seeing the restarts happening. On edison, I do not get any restarted workers after the worker allocation times out. I tried using both the orginal client.gather approach as well as the as_completed approach suggested by @mrocklin.

As a check, I also tried using workers that lasted 10 minutes to make sure everything completes correctly when given enough time, and the long queue request succeeded. For the restart testing, I used 1 min. queue requests for the workers, and I tried with both 1 and 2 workers, but to no avail. Watching the SLURM queue, I could see the controlling job start, the worker(s) start, the workers complete, and then the controller just waiting until it times out.

Here is the python code for running dask:

from dask.distributed import Client, as_completed
from dask_jobqueue import SLURMCluster
import numpy as np
import time


def my_slow_task(i):
    print("i=", i)
    time.sleep(10)
    return i**2

# Start the cluster...
cluster = SLURMCluster(cores=1, processes=1, walltime="00:01:00")
print(cluster.job_script())
cluster.adapt(minimum=1, maximum=1)
client = Client(cluster)

# Do some work...
# Method 1 using client.gather
# results = client.gather(client.map(my_slow_task, range(20)))

# Method 2 using as_completed
ntasks = 20
feedback = np.empty(ntasks)
results = []
feedback = client.map(my_slow_task, range(ntasks))
for future in as_completed(feedback):
    results.append(future.result())

print(results)

And, here is the job submission script I used to submit the above code:

#!/bin/csh
#SBATCH --job-name=dasktest
#SBATCH --time=00:15:00
#SBATCH --nodes=1
#SBATCH --ntasks-per-node=24  #edison full core count
#SBATCH --output=slurm.out
#SBATCH -p debug
cd /global/u1/d/d3m088/cmdv/code/testing_dask
ipython test_dask_restarts.py

The versions of my software are given in my earlier post today. If it helps, here is the full dump from dask-worker.err. The dask-worker.out file is empty and no output gets to slurm.out. Is the lack of file locking on this system causing the difference for edison?

distributed.nanny - INFO -         Start Nanny at: 'tcp://10.128.14.25:40990'
distributed.diskutils - ERROR - Failed to clean up lingering worker directories in path: %s
Traceback (most recent call last):
  File "/global/homes/d/d3m088/.conda/envs/bill/lib/python3.6/site-packages/distributed/diskutils.py", line 232, in new_work_dir
    self._purge_leftovers()
  File "/global/homes/d/d3m088/.conda/envs/bill/lib/python3.6/site-packages/distributed/diskutils.py", line 137, in _purge_leftovers
    lock.acquire()
  File "/global/homes/d/d3m088/.conda/envs/bill/lib/python3.6/site-packages/distributed/locket.py", line 188, in acquire
    self._lock.acquire(self._timeout, self._retry_period)
  File "/global/homes/d/d3m088/.conda/envs/bill/lib/python3.6/site-packages/distributed/locket.py", line 118, in acquire
    lock.acquire(timeout, retry_period)
  File "/global/homes/d/d3m088/.conda/envs/bill/lib/python3.6/site-packages/distributed/locket.py", line 168, in acquire
    path=self._path,
  File "/global/homes/d/d3m088/.conda/envs/bill/lib/python3.6/site-packages/distributed/locket.py", line 100, in _acquire_non_blocking
    success = acquire()
  File "/global/homes/d/d3m088/.conda/envs/bill/lib/python3.6/site-packages/distributed/locket.py", line 165, in <lambda>
    acquire=lambda: _lock_file_non_blocking(self._file),
  File "/global/homes/d/d3m088/.conda/envs/bill/lib/python3.6/site-packages/distributed/locket.py", line 56, in _lock_file_non_blocking
    fcntl.flock(file_.fileno(), fcntl.LOCK_EX | fcntl.LOCK_NB)
OSError: [Errno 524] Unknown error 524
distributed.diskutils - ERROR - Could not acquire workspace lock on path: /global/u1/d/d3m088/cmdv/code/testing_dask/dask-worker-space/worker-sfxupu98.dirlock .Continuing without lock. This may result in workspaces not being cleaned up
Traceback (most recent call last):
  File "/global/homes/d/d3m088/.conda/envs/bill/lib/python3.6/site-packages/distributed/diskutils.py", line 57, in __init__
    with workspace._global_lock():
  File "/global/homes/d/d3m088/.conda/envs/bill/lib/python3.6/site-packages/distributed/locket.py", line 194, in __enter__
    self.acquire()
  File "/global/homes/d/d3m088/.conda/envs/bill/lib/python3.6/site-packages/distributed/locket.py", line 188, in acquire
    self._lock.acquire(self._timeout, self._retry_period)
  File "/global/homes/d/d3m088/.conda/envs/bill/lib/python3.6/site-packages/distributed/locket.py", line 118, in acquire
    lock.acquire(timeout, retry_period)
  File "/global/homes/d/d3m088/.conda/envs/bill/lib/python3.6/site-packages/distributed/locket.py", line 162, in acquire
    _lock_file_blocking(self._file)
  File "/global/homes/d/d3m088/.conda/envs/bill/lib/python3.6/site-packages/distributed/locket.py", line 52, in _lock_file_blocking
    fcntl.flock(file_.fileno(), fcntl.LOCK_EX)
OSError: [Errno 524] Unknown error 524
distributed.worker - INFO -       Start worker at:   tcp://10.128.14.25:45709
distributed.worker - INFO -          Listening to:   tcp://10.128.14.25:45709
distributed.worker - INFO -              nanny at:         10.128.14.25:40990
distributed.worker - INFO -              bokeh at:          10.128.14.25:8789
distributed.worker - INFO - Waiting to connect to:   tcp://10.128.0.130:45360
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO -               Threads:                          1
distributed.worker - INFO -                Memory:                   64.00 GB
distributed.worker - INFO -       Local Directory: /global/u1/d/d3m088/cmdv/code/testing_dask/dask-worker-space/worker-sfxupu98
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO -         Registered to:   tcp://10.128.0.130:45360
distributed.worker - INFO - -------------------------------------------------
distributed.core - INFO - Starting established connection
distributed.dask_worker - INFO - Exiting on signal 15
distributed.nanny - INFO - Closing Nanny at 'tcp://10.128.14.25:40990'
distributed.dask_worker - INFO - Exiting on signal 15
distributed.dask_worker - INFO - End worker
distributed.process - WARNING - reaping stray process <ForkServerProcess(ForkServerProcess-1, started daemon)>

@guillaumeeb guillaumeeb mentioned this issue Aug 14, 2018
@guillaumeeb
Copy link
Member

As mentioned in #126, I fear that adaptive mode is broken in release 0.3.0 of dask-jobqueue. It has latter been fixed by #63.

I would recommand trying master branch and see if that fixes this wrong behaviour.

@guillaumeeb
Copy link
Member

I confirm that adaptive is broken with 0.3.0, I see the same behaviour as @wgustafson: no worker is restarted after the first one is stopped.

@wgustafson
Copy link
Contributor Author

I did some tests and don't necessarily have good news to report.

I uninstalled dask-jobqueue v0.3.0 and installed the current master from this afternoon, reported as v0.3.0+31.g4ea3c98. I can confirm that new workers now start when old ones time out in the SLURM queue, which is what we wanted. However, I am now unable to get any results using the code above. The workers keep cycling beyond when the work should be completed and the scheduler task is not properly collecting the results and reporting them. Even when I request a short amount of work that should complete in a single worker's allocation, the workers keep cycling until the scheduler's allocation expires. Also, if I switch from cluster.adapt to cluster.scale I do not get a result.

My slurm.out file is reporting issues with workers already existing, as shown by the following dump. So, I'm not sure if the newest dask-jobqueue version is a step forward or backward for this particular workflow.

distributed.scheduler - ERROR - '10314564'
Traceback (most recent call last):
  File "/global/homes/d/d3m088/.conda/envs/bill/lib/python3.6/site-packages/distributed/scheduler.py", line 1267, in add_worker
    plugin.add_worker(scheduler=self, worker=address)
  File "/global/u1/d/d3m088/python/dask-jobqueue/dask_jobqueue/core.py", line 61, in add_worker
    self.running_jobs[job_id] = self.pending_jobs.pop(job_id)
KeyError: '10314564'
distributed.utils - ERROR - Worker already exists tcp://128.55.203.205:19379
Traceback (most recent call last):
  File "/global/homes/d/d3m088/.conda/envs/bill/lib/python3.6/site-packages/distributed/utils.py", line 646, in log_errors
    yield
  File "/global/homes/d/d3m088/.conda/envs/bill/lib/python3.6/site-packages/distributed/scheduler.py", line 1226, in add_worker
    raise ValueError("Worker already exists %s" % address)
ValueError: Worker already exists tcp://128.55.203.205:19379
distributed.core - ERROR - Worker already exists tcp://128.55.203.205:19379
Traceback (most recent call last):
  File "/global/homes/d/d3m088/.conda/envs/bill/lib/python3.6/site-packages/distributed/core.py", line 320, in handle_comm
    result = yield result
  File "/global/homes/d/d3m088/.conda/envs/bill/lib/python3.6/site-packages/tornado/gen.py", line 1099, in run
    value = future.result()
  File "/global/homes/d/d3m088/.conda/envs/bill/lib/python3.6/site-packages/tornado/gen.py", line 315, in wrapper
    yielded = next(result)
  File "/global/homes/d/d3m088/.conda/envs/bill/lib/python3.6/site-packages/distributed/scheduler.py", line 1226, in add_worker
    raise ValueError("Worker already exists %s" % address)
ValueError: Worker already exists tcp://128.55.203.205:19379
distributed.scheduler - ERROR - '10314564'
Traceback (most recent call last):
  File "/global/homes/d/d3m088/.conda/envs/bill/lib/python3.6/site-packages/distributed/scheduler.py", line 1662, in remove_worker
    plugin.remove_worker(scheduler=self, worker=address)
  File "/global/u1/d/d3m088/python/dask-jobqueue/dask_jobqueue/core.py", line 73, in remove_worker
    del self.running_jobs[job_id][name]
KeyError: '10314564'

@guillaumeeb
Copy link
Member

We've seen these KeyError exceptions before. This probably means that inside one job, your worker is terminated in error and automatically restarted on and on and on.

We have to find the root cause of worker failure, sometimes it is out of memory error, but I suspect it is not in your case...

@guillaumeeb
Copy link
Member

In order to move forward here, @wgustafson could you provide the first error you see in the slurm.out file? We need to find the root cause of worker dying prematurely.

@wgustafson
Copy link
Contributor Author

I re-ran the code to get the STDOUT and STDERR in slurm.out. This caused the IP addresses to change, so below I include both the slurm.out and dask-worker.err log output. The code and slurm submission script are from my previous comment on August 14, 2018 (above). Note that I am getting the print output from print(cluster.job_script()) but I never get the final results printed from the actual calculation, which should be transmitted back to the controlling process.

First, the contents of slurm.out. Sorry for the escape codes I haven't figured out how to get to output correctly yet.

Mon Aug 27 09:24:31 PDT 2018
distributed.scheduler - ERROR - '10512912'
Traceback (most recent call last):
  File "/global/homes/d/d3m088/.conda/envs/bill/lib/python3.6/site-packages/distributed/scheduler.py", line 1267, in add_worker
    plugin.add_worker(scheduler=self, worker=address)
  File "/global/u1/d/d3m088/python/dask-jobqueue/dask_jobqueue/core.py", line 61, in add_worker
    self.running_jobs[job_id] = self.pending_jobs.pop(job_id)
KeyError: '10512912'
distributed.utils - ERROR - Worker already exists tcp://128.55.203.68:46211
Traceback (most recent call last):
  File "/global/homes/d/d3m088/.conda/envs/bill/lib/python3.6/site-packages/distributed/utils.py", line 646, in log_errors
    yield
  File "/global/homes/d/d3m088/.conda/envs/bill/lib/python3.6/site-packages/distributed/scheduler.py", line 1226, in add_worker
    raise ValueError("Worker already exists %s" % address)
ValueError: Worker already exists tcp://128.55.203.68:46211
distributed.core - ERROR - Worker already exists tcp://128.55.203.68:46211
Traceback (most recent call last):
  File "/global/homes/d/d3m088/.conda/envs/bill/lib/python3.6/site-packages/distributed/core.py", line 320, in handle_comm
    result = yield result
  File "/global/homes/d/d3m088/.conda/envs/bill/lib/python3.6/site-packages/tornado/gen.py", line 1099, in run
    value = future.result()
  File "/global/homes/d/d3m088/.conda/envs/bill/lib/python3.6/site-packages/tornado/gen.py", line 315, in wrapper
    yielded = next(result)
  File "/global/homes/d/d3m088/.conda/envs/bill/lib/python3.6/site-packages/distributed/scheduler.py", line 1226, in add_worker
    raise ValueError("Worker already exists %s" % address)
ValueError: Worker already exists tcp://128.55.203.68:46211
distributed.scheduler - ERROR - '10512912'
Traceback (most recent call last):
  File "/global/homes/d/d3m088/.conda/envs/bill/lib/python3.6/site-packages/distributed/scheduler.py", line 1662, in remove_worker
    plugin.remove_worker(scheduler=self, worker=address)
  File "/global/u1/d/d3m088/python/dask-jobqueue/dask_jobqueue/core.py", line 73, in remove_worker
    del self.running_jobs[job_id][name]
KeyError: '10512912'
distributed.scheduler - ERROR - '10512920'
Traceback (most recent call last):
  File "/global/homes/d/d3m088/.conda/envs/bill/lib/python3.6/site-packages/distributed/scheduler.py", line 1267, in add_worker
    plugin.add_worker(scheduler=self, worker=address)
  File "/global/u1/d/d3m088/python/dask-jobqueue/dask_jobqueue/core.py", line 61, in add_worker
    self.running_jobs[job_id] = self.pending_jobs.pop(job_id)
KeyError: '10512920'
distributed.scheduler - ERROR - '10512920'
Traceback (most recent call last):
  File "/global/homes/d/d3m088/.conda/envs/bill/lib/python3.6/site-packages/distributed/scheduler.py", line 1662, in remove_worker
    plugin.remove_worker(scheduler=self, worker=address)
  File "/global/u1/d/d3m088/python/dask-jobqueue/dask_jobqueue/core.py", line 73, in remove_worker
    del self.running_jobs[job_id][name]
KeyError: '10512920'
�]0;IPython: code/testing_dask�#!/bin/bash

#!/usr/bin/env bash
#SBATCH -J dask-worker
#SBATCH -e dask-worker.err
#SBATCH -o dask-worker.out
#SBATCH -p debug
#SBATCH -n 1
#SBATCH --cpus-per-task=1
#SBATCH --mem=60G
#SBATCH -t 00:01:00
JOB_ID=${SLURM_JOB_ID%;*}



/global/homes/d/d3m088/.conda/envs/bill/bin/python -m distributed.cli.dask_worker tcp://128.55.203.147:28265 --nthreads 1 --memory-limit 64.00GB --name dask-worker--${JOB_ID}-- --death-timeout 60

�[0;31m---------------------------------------------------------------------------�[0m
�[0;31mKilledWorker�[0m                              Traceback (most recent call last)
�[0;32m/global/u1/d/d3m088/cmdv/code/testing_dask/test_dask_restarts.py�[0m in �[0;36m<module>�[0;34m()�[0m
�[1;32m     32�[0m �[0mfeedback�[0m �[0;34m=�[0m �[0mclient�[0m�[0;34m.�[0m�[0mmap�[0m�[0;34m(�[0m�[0mmy_slow_task�[0m�[0;34m,�[0m �[0mrange�[0m�[0;34m(�[0m�[0mntasks�[0m�[0;34m)�[0m�[0;34m)�[0m�[0;34m�[0m�[0m
�[1;32m     33�[0m �[0;32mfor�[0m �[0mfuture�[0m �[0;32min�[0m �[0mas_completed�[0m�[0;34m(�[0m�[0mfeedback�[0m�[0;34m)�[0m�[0;34m:�[0m�[0;34m�[0m�[0m
�[0;32m---> 34�[0;31m     �[0mprint�[0m�[0;34m(�[0m�[0;34m"result="�[0m�[0;34m,�[0m�[0mfuture�[0m�[0;34m.�[0m�[0mresult�[0m�[0;34m(�[0m�[0;34m)�[0m�[0;34m)�[0m�[0;34m�[0m�[0m
�[0m�[1;32m     35�[0m     �[0mresults�[0m�[0;34m.�[0m�[0mappend�[0m�[0;34m(�[0m�[0mfuture�[0m�[0;34m.�[0m�[0mresult�[0m�[0;34m(�[0m�[0;34m)�[0m�[0;34m)�[0m�[0;34m�[0m�[0m
�[1;32m     36�[0m �[0;34m�[0m�[0m

�[0;32m~/.conda/envs/bill/lib/python3.6/site-packages/distributed/client.py�[0m in �[0;36mresult�[0;34m(self, timeout)�[0m
�[1;32m    191�[0m                                   raiseit=False)
�[1;32m    192�[0m         �[0;32mif�[0m �[0mself�[0m�[0;34m.�[0m�[0mstatus�[0m �[0;34m==�[0m �[0;34m'error'�[0m�[0;34m:�[0m�[0;34m�[0m�[0m
�[0;32m--> 193�[0;31m             �[0msix�[0m�[0;34m.�[0m�[0mreraise�[0m�[0;34m(�[0m�[0;34m*�[0m�[0mresult�[0m�[0;34m)�[0m�[0;34m�[0m�[0m
�[0m�[1;32m    194�[0m         �[0;32melif�[0m �[0mself�[0m�[0;34m.�[0m�[0mstatus�[0m �[0;34m==�[0m �[0;34m'cancelled'�[0m�[0;34m:�[0m�[0;34m�[0m�[0m
�[1;32m    195�[0m             �[0;32mraise�[0m �[0mresult�[0m�[0;34m�[0m�[0m

�[0;32m~/.conda/envs/bill/lib/python3.6/site-packages/six.py�[0m in �[0;36mreraise�[0;34m(tp, value, tb)�[0m
�[1;32m    691�[0m             �[0;32mif�[0m �[0mvalue�[0m�[0;34m.�[0m�[0m__traceback__�[0m �[0;32mis�[0m �[0;32mnot�[0m �[0mtb�[0m�[0;34m:�[0m�[0;34m�[0m�[0m
�[1;32m    692�[0m                 �[0;32mraise�[0m �[0mvalue�[0m�[0;34m.�[0m�[0mwith_traceback�[0m�[0;34m(�[0m�[0mtb�[0m�[0;34m)�[0m�[0;34m�[0m�[0m
�[0;32m--> 693�[0;31m             �[0;32mraise�[0m �[0mvalue�[0m�[0;34m�[0m�[0m
�[0m�[1;32m    694�[0m         �[0;32mfinally�[0m�[0;34m:�[0m�[0;34m�[0m�[0m
�[1;32m    695�[0m             �[0mvalue�[0m �[0;34m=�[0m �[0;32mNone�[0m�[0;34m�[0m�[0m

�[0;31mKilledWorker�[0m: ('my_slow_task-e10b2ccc4ae480dcfa7052ff39639fcd', 'tcp://128.55.203.68:46284')
Exception ignored in: <generator object add_client at 0x2aaacafe0360>
RuntimeError: generator ignored GeneratorExit
Future exception was never retrieved
future: <Future finished exception=CommClosedError('in <closed TCP>: Stream is closed',)>
Traceback (most recent call last):
  File "/global/homes/d/d3m088/.conda/envs/bill/lib/python3.6/site-packages/distributed/comm/tcp.py", line 177, in read
    n_frames = yield stream.read_bytes(8)
  File "/global/homes/d/d3m088/.conda/envs/bill/lib/python3.6/site-packages/tornado/gen.py", line 1099, in run
    value = future.result()
tornado.iostream.StreamClosedError: Stream is closed

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/global/homes/d/d3m088/.conda/envs/bill/lib/python3.6/site-packages/tornado/gen.py", line 1107, in run
    yielded = self.gen.throw(*exc_info)
  File "/global/homes/d/d3m088/.conda/envs/bill/lib/python3.6/site-packages/distributed/comm/tcp.py", line 198, in read
    convert_stream_closed_error(self, e)
  File "/global/homes/d/d3m088/.conda/envs/bill/lib/python3.6/site-packages/distributed/comm/tcp.py", line 126, in convert_stream_closed_error
    raise CommClosedError("in %s: %s" % (obj, exc))
distributed.comm.core.CommClosedError: in <closed TCP>: Stream is closed
Future exception was never retrieved
future: <Future finished exception=CommClosedError('in <closed TCP>: Stream is closed',)>
Traceback (most recent call last):
  File "/global/homes/d/d3m088/.conda/envs/bill/lib/python3.6/site-packages/distributed/comm/tcp.py", line 177, in read
    n_frames = yield stream.read_bytes(8)
  File "/global/homes/d/d3m088/.conda/envs/bill/lib/python3.6/site-packages/tornado/gen.py", line 1099, in run
    value = future.result()
tornado.iostream.StreamClosedError: Stream is closed

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/global/homes/d/d3m088/.conda/envs/bill/lib/python3.6/site-packages/tornado/gen.py", line 1107, in run
    yielded = self.gen.throw(*exc_info)
  File "/global/homes/d/d3m088/.conda/envs/bill/lib/python3.6/site-packages/distributed/comm/tcp.py", line 198, in read
    convert_stream_closed_error(self, e)
  File "/global/homes/d/d3m088/.conda/envs/bill/lib/python3.6/site-packages/distributed/comm/tcp.py", line 126, in convert_stream_closed_error
    raise CommClosedError("in %s: %s" % (obj, exc))
distributed.comm.core.CommClosedError: in <closed TCP>: Stream is closed
Mon Aug 27 09:27:54 PDT 2018

And, dask-worker.err:

distributed.nanny - INFO -         Start Nanny at: 'tcp://128.55.203.68:45895'
distributed.diskutils - ERROR - Failed to clean up lingering worker directories in path: %s 
Traceback (most recent call last):
  File "/global/homes/d/d3m088/.conda/envs/bill/lib/python3.6/site-packages/distributed/diskutils.py", line 232, in new_work_dir
    self._purge_leftovers()
  File "/global/homes/d/d3m088/.conda/envs/bill/lib/python3.6/site-packages/distributed/diskutils.py", line 137, in _purge_leftovers
    lock.acquire()
  File "/global/homes/d/d3m088/.conda/envs/bill/lib/python3.6/site-packages/distributed/locket.py", line 188, in acquire
    self._lock.acquire(self._timeout, self._retry_period)
  File "/global/homes/d/d3m088/.conda/envs/bill/lib/python3.6/site-packages/distributed/locket.py", line 118, in acquire
    lock.acquire(timeout, retry_period)
  File "/global/homes/d/d3m088/.conda/envs/bill/lib/python3.6/site-packages/distributed/locket.py", line 168, in acquire
    path=self._path,
  File "/global/homes/d/d3m088/.conda/envs/bill/lib/python3.6/site-packages/distributed/locket.py", line 100, in _acquire_non_blocking
    success = acquire()
  File "/global/homes/d/d3m088/.conda/envs/bill/lib/python3.6/site-packages/distributed/locket.py", line 165, in <lambda>
    acquire=lambda: _lock_file_non_blocking(self._file),
  File "/global/homes/d/d3m088/.conda/envs/bill/lib/python3.6/site-packages/distributed/locket.py", line 56, in _lock_file_non_blocking
    fcntl.flock(file_.fileno(), fcntl.LOCK_EX | fcntl.LOCK_NB)
OSError: [Errno 524] Unknown error 524
distributed.diskutils - ERROR - Could not acquire workspace lock on path: /global/u1/d/d3m088/cmdv/code/testing_dask/dask-worker-space/worker-ca6aslbu.dirlock .Continuing without lock. This may result in workspaces not being cleaned up
Traceback (most recent call last):
  File "/global/homes/d/d3m088/.conda/envs/bill/lib/python3.6/site-packages/distributed/diskutils.py", line 57, in __init__
    with workspace._global_lock():
  File "/global/homes/d/d3m088/.conda/envs/bill/lib/python3.6/site-packages/distributed/locket.py", line 194, in __enter__
    self.acquire()
  File "/global/homes/d/d3m088/.conda/envs/bill/lib/python3.6/site-packages/distributed/locket.py", line 188, in acquire
    self._lock.acquire(self._timeout, self._retry_period)
  File "/global/homes/d/d3m088/.conda/envs/bill/lib/python3.6/site-packages/distributed/locket.py", line 118, in acquire
    lock.acquire(timeout, retry_period)
  File "/global/homes/d/d3m088/.conda/envs/bill/lib/python3.6/site-packages/distributed/locket.py", line 162, in acquire
    _lock_file_blocking(self._file)
  File "/global/homes/d/d3m088/.conda/envs/bill/lib/python3.6/site-packages/distributed/locket.py", line 52, in _lock_file_blocking
    fcntl.flock(file_.fileno(), fcntl.LOCK_EX)
OSError: [Errno 524] Unknown error 524
distributed.worker - INFO -       Start worker at:  tcp://128.55.203.68:46284
distributed.worker - INFO -          Listening to:  tcp://128.55.203.68:46284
distributed.worker - INFO -              nanny at:        128.55.203.68:45895
distributed.worker - INFO -              bokeh at:         128.55.203.68:8789
distributed.worker - INFO - Waiting to connect to: tcp://128.55.203.147:28265
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO -               Threads:                          1
distributed.worker - INFO -                Memory:                   64.00 GB
distributed.worker - INFO -       Local Directory: /global/u1/d/d3m088/cmdv/code/testing_dask/dask-worker-space/worker-ca6aslbu
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO -         Registered to: tcp://128.55.203.147:28265
distributed.worker - INFO - -------------------------------------------------
distributed.core - INFO - Starting established connection
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO -         Registered to: tcp://128.55.203.147:28265
distributed.worker - INFO - -------------------------------------------------
distributed.core - INFO - Starting established connection
distributed.dask_worker - INFO - Exiting on signal 15
distributed.nanny - INFO - Closing Nanny at 'tcp://128.55.203.68:45895'
distributed.dask_worker - INFO - Exiting on signal 15
distributed.dask_worker - INFO - End worker
distributed.process - WARNING - reaping stray process <ForkServerProcess(ForkServerProcess-1, started daemon)>

@guillaumeeb
Copy link
Member

The problem looks a bit like #139. Do you define a local-directory to use for you workers? They seem to try to lock shared folders. Could you try to set it when constructing your cluster? Common values are /tmp or /scratch.

@wgustafson
Copy link
Contributor Author

I just tried manually stepping through the code and watching files get created on the disk system as I go. For my setup, I was using the default system temp drive, which is /tmp via $TMPDIR. I did not see anything get created in it. The directory containing the code was in my home directory, which is cross-mounted to the compute nodes on edison. So, it may not be possible to do file locking for it. The shared lustre system also does not have locking (I think). The home filesystem is where the dask-worker.err, dask-worker.out, and dask-worker-space/* get created.

If it matters, I also have my defaults in jobqueue.yaml set to not permit memory spilling. I am using

distributed:
  worker:
    memory:
      target: False  # Avoid spilling to disk
      spill: False  # Avoid spilling to disk
      pause: 0.80  # fraction at which we pause worker threads
      terminate: 0.95  # fraction at which we terminate the worker

I have noticed that the *.err and *.out files get into a race condition when I use multiple processes. I've watched them get overwritten as processes spin up, effectively wiping out output from previous processes. In the current tests I did not see this since I am only using one process (unless it happened when the adapt methodology spun up a new process and I missed it).

@mrocklin
Copy link
Member

Small correction:

all tasks from that worker are marked as suspicious, even if they completed

This isn't true. Completed tasks are not marked as suspicious, just all executing and pending tasks on that worker (the scheduler doesn't know which tasks the worker has chosn to execute at any given moment, but does know once a task completes)

@eXpensia
Copy link

eXpensia commented Aug 28, 2019

Is there any any news on the solution ? I tried the solution of @guillaumeeb, the problem is that when i specify minimum = 0 in client.adapt I never get a job request when I check the queue. When i put a higher number, i got some but when they are killed from reaching wall time they are not renewed.

@lesteve
Copy link
Member

lesteve commented Aug 28, 2019

The functionality mentioned by @jni in #122 (comment) is in a distributed release (distributed >= 2.2.0, see [this commit](for more details)).

In principle, we could use extra parameter in your favourite Cluster object to pass additional parameters to dask-worker, so use --lifetime --lifetime-restart and --lifetime-stagger to adress the original problem.

If someone has the time to look at this, it would be great. Something that would be extremely useful, would be to have a stand-alone example to reproduce the problem.

Full disclosure : I am very unlikely to look at this in the near future, so adding the "help wanted" label.

@lesteve lesteve added help wanted Extra attention is needed and removed sprint A good issue to tackle during a sprint labels Aug 28, 2019
@jni
Copy link

jni commented Aug 29, 2019

@lesteve sorry that I didn't reply to your last ping. This remains on my radar but the earliest I can work with it is mid-September. I think you're right, passing those parameters to dask-worker is the right solution.

@lesteve
Copy link
Member

lesteve commented Aug 29, 2019

Thanks for letting us know and no worries I think everyone is busy with many other things.

If someone has the time to put together a stand-alone snippet reproducing the problem, it would be really appreciated! If I were to look at this I would do something like:

cluster = YourCluster(walltime='00:00:30', ...)
cluster.scale(1)
client = Client(cluster)

def func(i):
    time.sleep(5)

futures = client.map(func, range(100))

The idea is that the total time for all the tasks exceed the walltime of a single job.

@eXpensia
Copy link

eXpensia commented Aug 29, 2019

Thanks for the answers, i'll check the extra parameters.

@jni
Copy link

jni commented Oct 14, 2019

@lesteve I'm finally ready to work on this. =) My main issue is that I don't understand how to test everything locally. Does dask-jobqueue provide a way to create a "dummy" cluster/scheduler for testing purposes? ie I'd like to test submitting jobs on my machine without having to depend on my institutional SLURM cluster. Any pointers here would be highly appreciated!

@lesteve
Copy link
Member

lesteve commented Oct 14, 2019

Great to hear that!

I think an excellent first step would be to have a minimal example to reproduce the original problem on a real cluster. This can be done independently of testing things locally.

About the dask-jobqueue testing infrastructure, we do have a docker-compose setup for some schedulers (SGE, PBS, SLURM) that you can run locally (and that is used for CI).

There is a bit of doc about that in https://jobqueue.dask.org/en/latest/develop.html#test-with-job-scheduler.

#324 might have some additional useful info as well.

PR to improve the doc about how to test locally and/or how to make it more prominent are more than welcome!

@jni
Copy link

jni commented Oct 14, 2019

This should be sufficient for a reproducible example. I'm making it take 30min but it should be straightforward to reduce that... I just don't have a good sense of the overhead of launching a worker so I was reluctant to reduce it further. To reduce it:

  • reduce the total duration (in cluster.adapt)
  • reduce the walltime to half of that duration
  • reduce the range call in filenames = [] to 4 * 60 * target duration (in mins)

Everything else should be the same.

import time
import numpy as np
from dask_jobqueue import SLURMCluster as Cluster
from dask import delayed
from dask.distributed import Client, as_completed

# config in $HOME/.config/dask/jobqueue.yaml
cluster = Cluster(walltime='00:15:00', cores=1, memory='4gb', job_cpu=1,
                  job_mem='4gb')
cluster.adapt(minimum=0, maximum=4, target_duration='30min')
client = Client(cluster)

# each job takes 1s, and we have 4 cpus * 30 min * 60s/min = 7200 cpu.s
filenames = [f'img{num}.jpg' for num in range(7200)]

def features(num_fn):
    num, image_fn = num_fn
    time.sleep(1)  # takes about 1s to compute features on an image
    features = np.random.random(246)
    return num, features


num_files = len(filenames)
num_features = len(features(filenames[0])[1])

X = np.zeros((num_files, num_features), dtype=np.float32)

for future in as_completed(client.map(features, enumerate(filenames))):
    i, v = future.result()
    X[i, :] = v

assert not np.any(np.sum(X, axis=1) == 0)  # make sure all rows were filled

Here's my yaml config, though it probably doesn't need much of this:

jobqueue:
  slurm:
    name: dask-worker

    # Dask worker options
    cores: 8                # Total number of cores per job
    memory: 36gb                # Total amount of memory per job
    processes: 1                # Number of Python processes per job

    #interface: null             # Network interface to use like eth0 or ib0
    death-timeout: 60           # Number of seconds to wait if a worker can not find a scheduler
    local-directory: $TMPDIR       # Location of fast local storage like /scratch or $TMPDIR

    # SLURM resource manager options
    #queue: null
    project: su62
    walltime: '01:00:00'
    #extra: []
    #env-extra: []
    job-cpu: 8
    job-mem: 36gb
    #job-extra: {}
    log-directory: /home/jnun0003/dask-logs

To work around the issue with current dask-jobqueue, you do:

work_queue = as_completed(client.map(features, enumerate(filenames)))
for future in work_queue:
    try:
        i, v = future.result()
    except KilledWorker:
        client.retry([future])
        work_queue.add(future)
    else:
        X[i, :] = v

in the for-loop.

To fix the issue with the latest dask-distributed, when a dask-worker job gets submitted to the cluster, we should add --lifetime=<value> where value is the walltime of the cluster worker, minus some small margin, e.g. 1min. This will make the dask scheduler not treat the death of that worker as suspicious. I think .adapt will take care of launching a new worker. It's unclear to me whether --lifetime-stagger is something we should worry about at this point.

Please let me know if the script above is sufficient to reproduce the issue for you!

@lesteve
Copy link
Member

lesteve commented Oct 15, 2019

Great thanks a lot for your input!

This is the snippet I am trying to run on my cluster SGE. I just launched it will tell you whether this reproduces or not.

There were are a few fixes I had to do (lines are indicated by # FIX in the snippet below):

  • for the latest version of distributed (30minutes rather than 30min, list(enumerate) rather than enumerate otherwise "TypeError: Dask no longer supports mapping over Iterators or Queues.Consider using a normal for loop and Client.submit")
  • one minor fix in your logic
import time
import numpy as np
from dask_jobqueue import SGECluster as Cluster
from dask import delayed
from dask.distributed import Client, as_completed

# config in $HOME/.config/dask/jobqueue.yaml
cluster = Cluster(walltime='00:15:00', cores=1, memory='4gb',
    queue='all.q',
    resource_spec='h_vmem=10G,mem_req=4G') #, job_cpu=1, job_mem='4gb')

cluster.adapt(minimum_jobs=0, maximum_jobs=4, target_duration='30minutes') # FIX
client = Client(cluster)


# each job takes 1s, and we have 4 cpus * 30 min * 60s/min = 7200 cpu.s
filenames = [f'img{num}.jpg' for num in range(7200)]

def features(num_fn):
    num, image_fn = num_fn
    time.sleep(1)  # takes about 1s to compute features on an image
    features = np.random.random(246)
    return num, features


num_files = len(filenames)
num_features = len(features((0, filenames[0]))[1]) # FIX

X = np.zeros((num_files, num_features), dtype=np.float32)

for future in as_completed(client.map(features, list(enumerate(filenames)))): # FIX
    i, v = future.result()
    X[i, :] = v

assert not np.any(np.sum(X, axis=1) == 0)  # make sure all rows were filled

Error:

---------------------------------------------------------------------------
KilledWorker                              Traceback (most recent call last)
~/work/test-dask-jobqueue.py in <module>
     30 
     31 for future in as_completed(client.map(features, list(enumerate(filenames)))):
---> 32     i, v = future.result()
     33     X[i, :] = v
     34 

~/miniconda3/envs/dask/lib/python3.7/site-packages/distributed/client.py in result(self, timeout)
    219         if self.status == "error":
    220             typ, exc, tb = result
--> 221             raise exc.with_traceback(tb)
    222         elif self.status == "cancelled":
    223             raise result

KilledWorker: ('features-9926151cd21fae08dd7f1e8344cadf85', <Worker 'tcp://10.141.0.18:35898', memory: 0, processing: 3617>)

@lesteve
Copy link
Member

lesteve commented Oct 16, 2019

I can reproduce the problem indeed, which is a great start. I edited my earlier message with the error.

@guillaumeeb
Copy link
Member

Thanks to @willsALMANJ issue a few days ago, I tried the --lifetime option and I confirm that it works perfectly with the latest Dask, Distributed and Jobqueue versions.

The initial script I used (just reduced time):

import time
import numpy as np
from dask_jobqueue import PBSCluster as Cluster
from dask import delayed
from dask.distributed import Client, as_completed

# config in $HOME/.config/dask/jobqueue.yaml
cluster = Cluster(walltime='00:01:00', cores=1, memory='4gb')
cluster.adapt(minimum=0, maximum=4) # FIX

client = Client(cluster)

# each job takes 1s, and we have 4 cpus * 1 min * 60s/min = 240 cpu.s, let's ask for a little more tasks.
filenames = [f'img{num}.jpg' for num in range(480)]

def features(num_fn):
    num, image_fn = num_fn
    time.sleep(1)  # takes about 1s to compute features on an image
    features = np.random.random(246)
    return num, features

num_files = len(filenames)
num_features = len(features((0, filenames[0]))[1]) # FIX

X = np.zeros((num_files, num_features), dtype=np.float32)

for future in as_completed(client.map(features, list(enumerate(filenames)))): # FIX
    i, v = future.result()
    X[i, :] = v

It fails with a KilleWorkerException when the first 4 workers are killed due to walltime.

Just modify the cluster initialization:

cluster = Cluster(walltime='00:01:00', cores=1, memory='4gb', extra=["--lifetime", "50s"])

And it works! I think it solves the problem here.

It's unclear to me whether --lifetime-stagger is something we should worry about at this point

I think this would be valuable when scaling up with 100s of workers, at that point you don't want them all to stop at the same time.

I'll try to produce some documentation to explain all that and close this issue. The outline should look something like:

How to handle Job Queueing system walltime killing workers

Reaching walltime can be troublesome

  • when you don't have a lot of room on you HPC platform and have only a few workers at a time: these workers will be killed (and other started) before you workload ends.
  • when you really don't know how long your workload will take: all your workers could be killed before reaching the end. In this case, you want to use adaptive clusters.

If you don't set the proper parameters, you'll run into KilleWorker exceptions in thos two cases.
Use --lifetime worker option. This will enables infinite workloads using adaptive.

Use --lifetime-stagger when dealing with many workers.

Examples

cluster = Cluster(walltime='01:00:00', cores=4, memory='16gb', extra=["--lifetime", "55m", "--lifetime-stagger", "4m"])
cluster.adapt(minimum=0, maximum=200)

...

@MatthewLennie
Copy link

@guillaumeeb Is this still current? It seems like there is an additional flag in the Worker CLI to trigger life time restarts --lifetime-restart. Even with the additional flag this doesn't seem to work for me.

@guillaumeeb
Copy link
Member

Is this still current?

I just tested with dask and distributed 2.22.0 and jobqueue 0.7.1, and it works.

It seems like there is an additional flag in the Worker CLI to trigger life time restarts --lifetime-restart. Even with the additional flag this doesn't seem to work for me

You don't want to use this flag in this case, or your jobs will never terminate normally, they will be killed by the job scheduler due to walltime. What you want is your jobs ending before the walltime limit, so only --lifetime.

Did you tested this with the code above, or your own code?

@riedel
Copy link
Member

riedel commented Oct 11, 2020

What is the correct way to evict a job from the schedulers point of view? Is SIGTERM enough for a worker to correctly terminate and transfer its memory, after dask/distributed#2844 gets merged?

Does gracefull shutdown mean, that memory gets moved?

I agree that workers ideally terminate one after another, there should be a way to spin up replacements well in time or choose staggered walltimes, since my problem was that all workers had approximately the same wall time and there would have been no way to gracefully handle the situation. What is the current policy here and what would be a sensible policy? It does not make sense to transfer memory and tasks to the worker that dies next I guess. What would be the idea how to handle it?

It seems to me that adapt could have the tendency to kill the youngest jobs first, since they do not carry too much data. Will specifying lifetime take care of this? Or is this implicitely handled by adapt, because an empty new worker is spun up, that is an ideal target?

@guillaumeeb
Copy link
Member

@riedel sorry for the delay...

Just submitted a merge request to clarify things here and close this issue.

What is the correct way to evict a job from the schedulers point of view?

From Dask-jobqueue perspective, just use --lifetime option as explained above. This will trigger a gracefull shutdown.

What is the current policy here and what would be a sensible policy?

Use --lifetime-stagger option.

It does not make sense to transfer memory and tasks to the worker that dies next I guess. What would be the idea how to handle it?

Hmm, I guess there is no correct answer to this right now. The solution here probably won't work well on worker with a heavy memory load.

Will specifying lifetime take care of this?

I don't think that. The first started worker will be killed first (with some uncertainties due to --lifetime-stagger). But here we've got no choice, this is a job queueing system walltime problem, these workers would be killed anyway.

Or is this implicitely handled by adapt, because an empty new worker is spun up, that is an ideal target?

Not handled by adapt, but I guess the Scheduler has some logics on reafecting tasks and memory, and it will probably aim the youngest and free workers for this.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working documentation Documentation-related enhancement New feature or request help wanted Extra attention is needed
Projects
None yet