Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Error when scaling a function over a wider range of parameters (LSFCluster) #139

Closed
adamhaber opened this issue Aug 26, 2018 · 22 comments
Closed

Comments

@adamhaber
Copy link

adamhaber commented Aug 26, 2018

Hi,

I'm trying to compute some function over a wide range of parameters using LSFCluster.

The general outline is as follows:

def f(x,y):
   ...

futures = [f(x,y) for x,y in list(itertools.product(range(X),range(Y)))]
x = progress(client.compute(futures))
x

When I try to compute with X=Y=20, everything goes smoothly.
However, when I increase the range of parameters over which I'm computing f(x,y) (for example = X=Y=100), I get an error message I don't understand:

distributed.scheduler - ERROR - '856313'
Traceback (most recent call last):
  File "/home/adamh/miniconda3/lib/python3.5/site-packages/distributed/scheduler.py", line 1267, in add_worker
    plugin.add_worker(scheduler=self, worker=address)
  File "/home/adamh/miniconda3/lib/python3.5/site-packages/dask_jobqueue/core.py", line 61, in add_worker
    self.running_jobs[job_id] = self.pending_jobs.pop(job_id)
KeyError: '856313'

Just to be sure, I ran bjobs -r and indeed I have a job running with job id 856313. I get similar error message for many other different workers.

Some more info which might be relevant:

  1. When I change f to be some really simple function (f(x,y)=x+y), the problem disappears.
  2. During runtime, f writes temporary files to a tmp directory - each f(x,y) creates its own temp directory - perhaps this involves too much disk operations?
  3. When I run everything locally, it takes forever (hence dask :-)) but doesn't crash and doesn't fill up the memory.

Any help would be much appreciated!

@guillaumeeb
Copy link
Member

We've seen this error in other cases, see #117 for example. Currently, we believe it is due to workers restarting for some reason, which reveal some bug in dask-jobqueue code, see #138.

So probably the KeyError is not the root cause of your problem, justa consequence. It would be nice if you could check your workers logs and see if you see any restart. This is often due to out of memory problem, but apparently this is not your case.

@adamhaber
Copy link
Author

adamhaber commented Aug 27, 2018

I searched for useful logging information, this is what I found so far:

  1. The dask-worker-space folder is filled folders that are either called worker-XYZ or worker-XYZ.dirlock. All folders seem to be empty.
  2. I did cat dask-worker.err | grep WARN and found different kinds of warning; examples for the most common ones (which seemed informative to me) are:
distributed.worker - WARNING -  Compute Failed
distributed.nanny - WARNING - Worker process 7061 was killed by unknown signal
distributed.nanny - WARNING - Restarting worker
istributed.process - WARNING - reaping stray process <ForkServerProcess(ForkServerProcess-1, started daemon)>
distributed.worker - WARNING - Heartbeat to scheduler failed
distributed.worker - WARNING - Could not deserialize task

So there were indeed restarts. Hope this helps...

@guillaumeeb
Copy link
Member

What is your configuration of LSFCluster (either in jobqueue.yaml, or you constructor parameters)?

From what you say, I suspect you use a shared space for worker local-directory.

@adamhaber
Copy link
Author

I haven't changed jobqueue.yaml at all - it's all commented out (it doesn't seem to have a section for LSF like it has for sge, slurm, pbs and moab).

In my constructor, I use:

cluster = LSFCluster(queue="new-medium", cores=8, memory="4G", job_extra=['-R rusage[mem=4000]'])

Regarding a shared space for worker local-directory - can you elaborate on this? I'm not sure I understand the problem (or how to try and fix it).

@guillaumeeb
Copy link
Member

See https://dask-jobqueue.readthedocs.io/en/latest/configuration-setup.html#local-storage.

It is highly recommended to use space local to computing nodes for local-directory kwarg: /tmp, /scratch or anything you've got on your cluster.

I don't know if this is what is causing your workers to restart, though. Could you print the logs file around

distributed.nanny - WARNING - Worker process 7061 was killed by unknown signal
distributed.nanny - WARNING - Restarting worker

Does you job scheduler kills specific processes when they grows beyond some requested resources threshold?

@adamhaber
Copy link
Author

adamhaber commented Aug 27, 2018

Most of the these warnings were accompanied by variants of:

distributed.core - INFO - Event loop was unresponsive in Worker for 3.35s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability

And yes - LSF kills jobs that reach their maximal allocated memory. But like you said - I don't think this is what happens here...

ADD: General question - do dask-worker.err,dask-worker.out and dask-worker-space restart from scratch each time I start LSFCluster client? I couldn't find it in the docs...

@adamhaber
Copy link
Author

Regarding the local space - I think I've found the corresponding environment variable in LSF - aptly named LSF_TMPDIR. :-)

However, it's not clear to me how to pass this to LSFCluster (as in here). I've tried:

cluster = LSFCluster(queue="new-short",cores=2,memory="4GB", job_extra=["-R rusage[mem=4000]"],
                    local_directory="$LSF_TMPDIR")

But got Error: --local-directory option requires an argument within dask-worker.err. Trying the same without the double quotes gives a syntax error.

Is there a better way to configure LSF cluster to use the LSF_TMPDIR as a local directory?

@guillaumeeb
Copy link
Member

Looks like the variable LSF_TMPDIR is not set inside your submitted jobs then. You could launch an interactive job on your cluster and see if you see any usefull environment variable defined by printing the env.

Else, just try to use some local disk like /tmp or maybe ask your cluster admins if there is some local space to use on your compute nodes.

@adamhaber
Copy link
Author

The nodes have both /tmp and /scratch - I've tried using both of them, but still got the same error message from above (about the unresponsive event loop)...

@guillaumeeb
Copy link
Member

ADD: General question - do dask-worker.err,dask-worker.out and dask-worker-space restart from scratch each time I start LSFCluster client? I couldn't find it in the docs...

I'm using PBSCluster which does not defined commons .err or .out files. However, if I did this, they would be created for each new job I would launched, so every time a single dask-worker job ends. This may not be a good default setting... With default PBSCluster, I've got one output file per worker job. I would recommand adding the %J meta character in the template as a PR, see https://www.ibm.com/support/knowledgecenter/en/SSETD4_9.1.3/lsf_command_ref/bsub.e.1.html, or simply removing this from the template.

The nodes have both /tmp and /scratch - I've tried using both of them, but still got the same error message from above (about the unresponsive event loop)...

You should not see anymore some dask-worker-space locks or thing like that. But apparently this is not the root cause of the problem.

Could you:

  • Give a more complete stack trace of dask-worker.err or .out files,
  • Maybe detail what kind of function is f(x,y) ?

Ideally, we would need a small reproducible example here.

@adamhaber
Copy link
Author

Sure - here's a toy script that reproduces the problem:

%load_ext autoreload
%autoreload 2

from dask.delayed import delayed
import shutil
import numpy as np
import os

from dask_jobqueue import LSFCluster
from dask.distributed import Client, progress

cluster = LSFCluster(queue="new-short", cores=2, memory="2GB", job_extra=["-R rusage[mem=4000]"],
                    local_directory='/tmp/')

cluster.scale(10)
client = Client(cluster)

def sim(i,r):
    import nest

    #each simulation should run in its own folder to make sure intermediate results don't overwrite eachother
    tmp_folder = "/tmp/tmp_{0}".format(i) 
    if os.path.exists(tmp_folder):
        shutil.rmtree(tmp_folder)
    
    os.makedirs(tmp_folder, mode=0o777)
    
    nest.ResetKernel()
    nest.SetKernelStatus({'data_path':tmp_folder})
    
    neuron = nest.Create("iaf_psc_alpha")
    noise = nest.Create("poisson_generator", 2)
    nest.SetStatus(noise, [{"rate": r}, {"rate": 15000.0}])

    sd = nest.Create("spike_detector")
    nest.SetStatus(sd, {"withgid": True, "withtime": True})

    nest.Connect(noise, neuron, syn_spec={'weight': [[1.2, -1.0]], 'delay': 1.0})
    nest.Connect(neuron, sd)

    nest.Simulate(1000.0)

    shutil.rmtree(tmp_folder)
    res = nest.GetStatus(sd)[0]['events']['times']
    nest.ResetKernel()
    # delete the temp folder we're done
    return res

results = [delayed(sim)(i,r) for i,r in enumerate(np.random.normal(80000,5000,size=5))]
x = client.compute(results)
progress(x)

NEST is a neural simulator kernel with a python interface. As far as I understand, it needs to run in its own dedicated folder, hence all the os hacking.

When I run this with size=5 it works OK.
When I run this with size=1000 I get all sorts of different errors...

@guillaumeeb
Copy link
Member

Hm, hard to reproduce as it needs the installation of nest, I will see if I can easily install it or not.

Do you reproduce the behavior without Nest? Could it be related to it?

@adamhaber
Copy link
Author

I couldn't reproduce the behaviour without nest, so it's probably related.

I'll try to reproduce it locally (on a single multicore machine with dask and nest) and will post an update.

@adamhaber
Copy link
Author

I can confirm that the error is not NEST related - I get the same KeyError: '299511' error when I tried to use a regular, numpy-only function, instead.
Sometimes I also get distributed.utils - ERROR - Worker already exists tcp://....

@guillaumeeb
Copy link
Member

I can confirm that the error is not NEST related - I get the same KeyError: '299511' error when I tried to use a regular, numpy-only function, instead.

Could you provide a reproducible example of this problem with numpy only?

@adamhaber
Copy link
Author

Sure - it's a simple function that takes a list of "event times" and "event senders" and returns a binarized raster of size n_senders x n_time_bins:

def rec_to_raster(rec,trial_duration_min=1,nodes=4,binning_res=20):
        trial_duration = trial_duration_min * 60 * 1000 #convert from minutes to ms
        number_of_bins = np.floor(trial_duration/binning_res).astype(int)
        senders = rec["events"]["senders"]
        times = rec["events"]["times"]
        senders = senders[times<trial_duration]
        times = times[times<trial_duration]
        binned_raster = np.zeros([nodes, number_of_bins])
        for i, neuron in enumerate(range(1,nodes+1)):    #this is needed since the neuron IDS which are kept in "senders" start from 1
                binned_raster[i,np.floor(times[np.nonzero(senders==neuron)]/binning_res).astype(int)]=1.
        return binned_raster.astype('uint32').T.copy(order='C')

When I do:

recordings = [pickle.load(open("/data/rec_{0}.pkl".format(i),'rb')) for i in range(10)]
rasters = [recording_to_rasters(rec) for rec in recordings]

It works fine. But when I do:

recordings = [pickle.load(open("/data/rec_{0}.pkl".format(i),'rb')) for i in range(10)]
rasters = [delayed(recording_to_rasters)(rec) for rec in recordings]
x = client.compute(res)

I get the above mentioned errors.

@guillaumeeb
Copy link
Member

guillaumeeb commented Aug 30, 2018

I miss your recording objects here open("/data/rec_{0}.pkl".format(i),'rb'). Are those complicate to generate, replicate?

@guillaumeeb
Copy link
Member

And other question, did you try to use the Dask dashboard for diagnosing any eventual problem (memory, CPU, or any contention)?

@adamhaber
Copy link
Author

I'm attaching an example for such a file - you can replicate it a few times with different suffixes to replicate what I did above...

recs_0_github_example.zip

@guillaumeeb
Copy link
Member

Hm, still need nest with your example it seems:

---------------------------------------------------------------------------
ImportError                               Traceback (most recent call last)
<ipython-input-9-a859f630c53e> in <module>()
      1 import pickle
----> 2 recordings = [pickle.load(open("/home/eh/eynardbg/inputs/recs_{0}.pkl".format(i),'rb')) for i in range(10)]
      3 rasters = [delayed(recording_to_rasters)(rec) for rec in recordings]
      4 x = client.compute(res)

<ipython-input-9-a859f630c53e> in <listcomp>(.0)
      1 import pickle
----> 2 recordings = [pickle.load(open("/home/eh/eynardbg/inputs/recs_{0}.pkl".format(i),'rb')) for i in range(10)]
      3 rasters = [delayed(recording_to_rasters)(rec) for rec in recordings]
      4 x = client.compute(res)

ImportError: No module named 'pynestkernel'

I again recommend looking at the Dask dashboard to see if anything weird (memory or CPU issues ?) arises.

@guillaumeeb
Copy link
Member

@adamhaber any update on this?

Otherwise I'm going to close it, it seems really specific to your environment or computation.

@guillaumeeb
Copy link
Member

@adamhaber I'm closing this one. Right now we don't have enough information to help here.

Feel free to try the latest dask-jobqueue and dask versions and see if you see some improvement.
Feel free to reopen this issue to give some further information or insights on how we could help you.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants