Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AssertionError (assert count > 0) in SLURMCluster._adapt #222

Closed
ogrisel opened this issue Jan 16, 2019 · 33 comments
Closed

AssertionError (assert count > 0) in SLURMCluster._adapt #222

ogrisel opened this issue Jan 16, 2019 · 33 comments

Comments

@ogrisel
Copy link

ogrisel commented Jan 16, 2019

When I run a slurm cluster with adapt() I sometimes get the following crash (but this is not deterministic and I have not identified a way to trigger it more often).

tornado.application - ERROR - Exception in callback functools.partial(<function wrap.<locals>.null_wrapper at 0x7f59359aed90>, <Future finished exception=AssertionError()>)
Traceback (most recent call last):
  File "/scratch/ogrisel/miniconda3/lib/python3.7/site-packages/tornado/ioloop.py", line 758, in _run_callback
    ret = callback()
  File "/scratch/ogrisel/miniconda3/lib/python3.7/site-packages/tornado/stack_context.py", line 300, in null_wrapper
    return fn(*args, **kwargs)
  File "/scratch/ogrisel/miniconda3/lib/python3.7/site-packages/tornado/ioloop.py", line 779, in _discard_future_result
    future.result()
  File "/scratch/ogrisel/miniconda3/lib/python3.7/site-packages/tornado/gen.py", line 1141, in run
    yielded = self.gen.throw(*exc_info)
  File "/scratch/ogrisel/miniconda3/lib/python3.7/site-packages/distributed/deploy/adaptive.py", line 334, in _adapt
    workers = yield self._retire_workers(workers=recommendations['workers'])
  File "/scratch/ogrisel/miniconda3/lib/python3.7/site-packages/tornado/gen.py", line 1133, in run
    value = future.result()
  File "/scratch/ogrisel/miniconda3/lib/python3.7/site-packages/tornado/gen.py", line 1141, in run
    yielded = self.gen.throw(*exc_info)
  File "/scratch/ogrisel/miniconda3/lib/python3.7/site-packages/distributed/deploy/adaptive.py", line 242, in _retire_workers
    close_workers=True)
  File "/scratch/ogrisel/miniconda3/lib/python3.7/site-packages/tornado/gen.py", line 1133, in run
    value = future.result()
  File "/scratch/ogrisel/miniconda3/lib/python3.7/site-packages/tornado/gen.py", line 1141, in run
    yielded = self.gen.throw(*exc_info)
  File "/scratch/ogrisel/miniconda3/lib/python3.7/site-packages/distributed/scheduler.py", line 2800, in retire_workers
    n=1, delete=False)
  File "/scratch/ogrisel/miniconda3/lib/python3.7/site-packages/tornado/gen.py", line 1133, in run
    value = future.result()
  File "/scratch/ogrisel/miniconda3/lib/python3.7/site-packages/tornado/gen.py", line 1147, in run
    yielded = self.gen.send(value)
  File "/scratch/ogrisel/miniconda3/lib/python3.7/site-packages/distributed/scheduler.py", line 2613, in replicate
    assert count > 0
AssertionError
@guillaumeeb
Copy link
Member

So looking at https://github.com/dask/distributed/blob/master/distributed/scheduler.py#L2644, it looks like it comes from rebalance() function. I don't know this part of the code, but I imagine that:

  • You've triggered some computation on the cluster, which left some data in memory across your workers.
  • adapt() is killing your workers after computation end, but you still have some reference to this data in memory (future objects maybe)
  • So the scheduler tries to preserve this data and copy it to other workers
  • But adapt has killed everyone, it cannot copy it anymore.

@mrocklin can you confirm/infirm this?

The solutions I see: remove every reference to data you don't need, or just use minimum kwarg with adapt.

@mrocklin
Copy link
Member

@guillaumeeb 's interpretation is fundamentally correct, with a small clarification

But adapt has killed everyone, it cannot copy it anymore.

There are no longer any workers that still have this data. It has been lost somehow. There may still be workers or those workers may have died from some other cause than adapt.

This seems like a bug to me. I don't have thoughts on how it might arise. It would be good to resolve.

@ogrisel
Copy link
Author

ogrisel commented Jan 17, 2019

I think I got this error in a session where cluster.adapt(minimum=0, maximum=10). But might code is eager at retrieving the results and I think the cluster should not destroy the last worker as long as there is still living data on it.

@mrocklin
Copy link
Member

mrocklin commented Jan 17, 2019 via email

@guillaumeeb
Copy link
Member

Hm, so this would be an adaptive bug. We would need to find a small example which reproduces this, ideally with LocalCluster if possible.

But it might come from how scale_up and scale_down are implemented here, even if I don't think so.

@guillaumeeb
Copy link
Member

@ogrisel do you still encounter this bug?

@SimonBoothroyd
Copy link

I can confirm this issue is still present (when used alongside distributed version 1.28.1), but I haven't been able to deterministically reproduce it unfortunately.

@guillaumeeb
Copy link
Member

Can anyone give feedback on this one after the big rewrite #306 that just occured?

@SimonBoothroyd
Copy link

@guillaumeeb I've been using the new #306 release quite a bit and haven't seen this once so far, so it seems like this has possibly been fixed.

@ogrisel
Copy link
Author

ogrisel commented Oct 13, 2019

I am no longer using dask on SLURM clusters so feel free to close if nobody else has seen this issue in a while.

@jhamman
Copy link
Member

jhamman commented Oct 13, 2019

Thanks @SimonBoothroyd and @ogrisel for the updates. Let's close this then.

@pbrady32
Copy link

pbrady32 commented Jan 9, 2020

Hey guys,

I'm seeing this exact same error using SLURMCluster. I've tried both cluster.adapt(minimum=1, maximum=15) as well as cluster.adapt(minimum=0, maximum=15).

Conda environment

dask                      2.9.0
dask-core                 2.9.0
dask-jobqueue             0.7.0
distributed               2.9.0
fastparquet               0.3.2

Here is the sequence of code I'm running followed by the error that is basically the same as @ogrisel error.

d_2006_pq = dd.read_parquet('/scratch/user/dask_data/diag_2006.parquet',
                     engine='fastparquet')

diag_list = ['290', '2900', '29010', '29011', '29012', '29013', '29020', 
             '29021', '2903', '2904', '29040', '29041', '29042', '29043', 
             '29282', '29410', '29411', '29420', '29421', '296', '300', 
             '30000', '30001', '30002', '30009', '311', '3310']

mask = d_2006_pq.loc[:, 'Diag'].isin(diag_list)

d_2006_pq2 = d_2006_pq.loc[mask, :]

d_2006_pq2['Patid'].value_counts().compute()

Error

distributed.utils - ERROR - 
Traceback (most recent call last):
  File "/home/pgbrady/.conda/envs/py37/lib/python3.7/site-packages/distributed/utils.py", line 663, in log_errors
    yield
  File "/home/pgbrady/.conda/envs/py37/lib/python3.7/site-packages/distributed/scheduler.py", line 3161, in retire_workers
    delete=False,
  File "/home/pgbrady/.conda/envs/py37/lib/python3.7/site-packages/distributed/scheduler.py", line 2923, in replicate
    assert count > 0
AssertionError
distributed.core - ERROR - 
Traceback (most recent call last):
  File "/home/pgbrady/.conda/envs/py37/lib/python3.7/site-packages/distributed/core.py", line 412, in handle_comm
    result = await result
  File "/home/pgbrady/.conda/envs/py37/lib/python3.7/site-packages/distributed/scheduler.py", line 3161, in retire_workers
    delete=False,
  File "/home/pgbrady/.conda/envs/py37/lib/python3.7/site-packages/distributed/scheduler.py", line 2923, in replicate
    assert count > 0
AssertionError
distributed.utils - ERROR - 
Traceback (most recent call last):
  File "/home/pgbrady/.conda/envs/py37/lib/python3.7/site-packages/distributed/utils.py", line 663, in log_errors
    yield
  File "/home/pgbrady/.conda/envs/py37/lib/python3.7/site-packages/distributed/deploy/adaptive.py", line 156, in scale_down
    names=workers, remove=True, close_workers=True
  File "/home/pgbrady/.conda/envs/py37/lib/python3.7/site-packages/distributed/core.py", line 686, in send_recv_from_rpc
    result = await send_recv(comm=comm, op=key, **kwargs)
  File "/home/pgbrady/.conda/envs/py37/lib/python3.7/site-packages/distributed/core.py", line 556, in send_recv
    raise exc.with_traceback(tb)
  File "/home/pgbrady/.conda/envs/py37/lib/python3.7/site-packages/distributed/core.py", line 412, in handle_comm
    result = await result
  File "/home/pgbrady/.conda/envs/py37/lib/python3.7/site-packages/distributed/scheduler.py", line 3161, in retire_workers
    delete=False,
  File "/home/pgbrady/.conda/envs/py37/lib/python3.7/site-packages/distributed/scheduler.py", line 2923, in replicate
    assert count > 0
AssertionError
tornado.application - ERROR - Exception in callback functools.partial(<bound method IOLoop._discard_future_result of <zmq.eventloop.ioloop.ZMQIOLoop object at 0x2b70b7eca7d0>>, <Task finished coro=<AdaptiveCore.adapt() done, defined at /home/pgbrady/.conda/envs/py37/lib/python3.7/site-packages/distributed/deploy/adaptive_core.py:170> exception=AssertionError()>)
Traceback (most recent call last):
  File "/home/pgbrady/.conda/envs/py37/lib/python3.7/site-packages/tornado/ioloop.py", line 743, in _run_callback
    ret = callback()
  File "/home/pgbrady/.conda/envs/py37/lib/python3.7/site-packages/tornado/ioloop.py", line 767, in _discard_future_result
    future.result()
  File "/home/pgbrady/.conda/envs/py37/lib/python3.7/site-packages/distributed/deploy/adaptive_core.py", line 194, in adapt
    await self.scale_down(**recommendations)
  File "/home/pgbrady/.conda/envs/py37/lib/python3.7/site-packages/distributed/deploy/adaptive.py", line 156, in scale_down
    names=workers, remove=True, close_workers=True
  File "/home/pgbrady/.conda/envs/py37/lib/python3.7/site-packages/distributed/core.py", line 686, in send_recv_from_rpc
    result = await send_recv(comm=comm, op=key, **kwargs)
  File "/home/pgbrady/.conda/envs/py37/lib/python3.7/site-packages/distributed/core.py", line 556, in send_recv
    raise exc.with_traceback(tb)
  File "/home/pgbrady/.conda/envs/py37/lib/python3.7/site-packages/distributed/core.py", line 412, in handle_comm
    result = await result
  File "/home/pgbrady/.conda/envs/py37/lib/python3.7/site-packages/distributed/scheduler.py", line 3161, in retire_workers
    delete=False,
  File "/home/pgbrady/.conda/envs/py37/lib/python3.7/site-packages/distributed/scheduler.py", line 2923, in replicate
    assert count > 0
AssertionError

@lesteve
Copy link
Member

lesteve commented Jan 10, 2020

I know it is a rather big ask, but without a stand-alone snippet reproducing the problem it is not going to be easy to debug this problem ...

@pbrady32
Copy link

@lesteve Sorry for the likely naive question, but can you explain what you mean by a "stand-alone snippet"?

@lesteve
Copy link
Member

lesteve commented Jan 15, 2020

No worries! It means some code that reproduce the problem and that anyone can run on its computer. You can read https://stackoverflow.com/help/mcve for more details.

@pbrady32
Copy link

Thanks! I'll have to think about how to make this feasible. The biggest hurdle in my specific case is that the data I'm using are proprietary, so I won't be able to hand that over.

@markkoob
Copy link

markkoob commented Apr 2, 2020

I am also having this issue with an LSF cluster. The data has been persisted to the cluster in my case, but it seems that the workers get closed before they can transfer the operands, leading to recomputing of some pretty expensive operations. If I prevent the workers from being closed via adapt parameters the issue is not seen.

I believe I can reproduce this case with generic data, but I don't believe it would be possible to do with a local cluster. Is an MRE which relies on a jobqueue cluster useful?

@lesteve
Copy link
Member

lesteve commented Apr 2, 2020

Is an MRE which relies on a jobqueue cluster useful?

That would be very useful indeed!

This is likely a distributed bug, but having a way to reproduce on a HPC cluster would definitely be a good start!

@markkoob
Copy link

markkoob commented Apr 3, 2020

Alright, here's what I wound up with. Clearly you'll need to fill out start_cluster as appropriate for your HPC. I think the key here is that the cumsum operation is not trivial to parallelize, resulting in idle workers. In my case adapt closes the idle workers before they can finish transferring their results to other workers.

I think the results may be dependent on network and worker performance, that can probably be minimized by configuring adapt to be more aggressive about closing workers. Perhaps: cluster.adapt(interval='100 ms', wait_count=1).

import dask.dataframe as dd
import dask_ml.datasets

from dask.distributed import Client
import dask

def start_cluster():
    # Replace these two lines with your HPC cluster startup.
    # I used 20*2**30 for worker memory.
    from dask_jobqueue import LSFCluster
    cluster = LSFCluster()

    cluster.adapt(minimum=0, maximum=4)
    client = Client(cluster)
    return client

client = start_cluster()

n_features = 250000
X = dask_ml.datasets.make_blobs(n_samples=10000, n_features=n_features, chunks=(250, n_features))

dfx = dd.from_dask_array(X[0])
dfx.npartitions

dfx = client.persist(dfx)
dfx['id'] = 1
dfx['id'] = dfx['id'].cumsum()
dfx.set_index('id', sorted=True)

dfx.rename(columns=str).to_parquet('mre.parq')

@lesteve
Copy link
Member

lesteve commented Apr 3, 2020

Thanks for the snippet!

For future references, it would be very nice if you could edit your message and add the following info:

  • dask, distributed and dask-jobqueue versions.
  • the full stack-trace even if it is long you can use <details> to make the content foldable (see this for example)
  • indicate if your snippet reproduces the bug every single time or you have to launch it a few times
  • a rough estimate of how long it runs before it fails

@markkoob
Copy link

markkoob commented Apr 3, 2020

Oh geeze, sorry, that would be helpful wouldn't it. Stack trace and versions are below, although the details tag seems to have negatively impacted the formatting :(

This example takes less than 10 minutes to run with four machines on my cluster, and it has reproduced the error every time (n=3)

Please do let me know if I can provide anything else that would be helpful!

Stack Trace
distributed.utils - ERROR - 
Traceback (most recent call last):
  File "./conda_py3.7.4/1.4/linux64/lib/python3.7/site-packages/distributed/utils.py", line 662, in log_errors
    yield
  File "./conda_py3.7.4/1.4/linux64/lib/python3.7/site-packages/distributed/scheduler.py", line 3424, in retire_workers
    lock=False,
  File "./conda_py3.7.4/1.4/linux64/lib/python3.7/site-packages/distributed/scheduler.py", line 3178, in replicate
    assert count > 0
AssertionError
distributed.core - ERROR - 
Traceback (most recent call last):
  File "./conda_py3.7.4/1.4/linux64/lib/python3.7/site-packages/distributed/core.py", line 412, in handle_comm
    result = await result
  File "./conda_py3.7.4/1.4/linux64/lib/python3.7/site-packages/distributed/scheduler.py", line 3424, in retire_workers
    lock=False,
  File "./conda_py3.7.4/1.4/linux64/lib/python3.7/site-packages/distributed/scheduler.py", line 3178, in replicate
    assert count > 0
AssertionError
distributed.utils - ERROR - 
Traceback (most recent call last):
  File "./conda_py3.7.4/1.4/linux64/lib/python3.7/site-packages/distributed/utils.py", line 662, in log_errors
    yield
  File "./conda_py3.7.4/1.4/linux64/lib/python3.7/site-packages/distributed/deploy/adaptive.py", line 156, in scale_down
    names=workers, remove=True, close_workers=True
  File "./conda_py3.7.4/1.4/linux64/lib/python3.7/site-packages/distributed/core.py", line 686, in send_recv_from_rpc
    result = await send_recv(comm=comm, op=key, **kwargs)
  File "./conda_py3.7.4/1.4/linux64/lib/python3.7/site-packages/distributed/core.py", line 556, in send_recv
    raise exc.with_traceback(tb)
  File "./conda_py3.7.4/1.4/linux64/lib/python3.7/site-packages/distributed/core.py", line 412, in handle_comm
    result = await result
  File "./conda_py3.7.4/1.4/linux64/lib/python3.7/site-packages/distributed/scheduler.py", line 3424, in retire_workers
    lock=False,
  File "./conda_py3.7.4/1.4/linux64/lib/python3.7/site-packages/distributed/scheduler.py", line 3178, in replicate
    assert count > 0
AssertionError
tornado.application - ERROR - Exception in callback functools.partial(<bound method IOLoop._discard_future_result of <zmq.eventloop.ioloop.ZMQIOLoop object at 0x2b84f7ce9190>>, <Task finished coro=<AdaptiveCore.adapt() done, defined at ./conda_py3.7.4/1.4/linux64/lib/python3.7/site-packages/distributed/deploy/adaptive_core.py:170> exception=AssertionError()>)
Traceback (most recent call last):
  File "./conda_py3.7.4/1.4/linux64/lib/python3.7/site-packages/tornado/ioloop.py", line 743, in _run_callback
    ret = callback()
  File "./conda_py3.7.4/1.4/linux64/lib/python3.7/site-packages/tornado/ioloop.py", line 767, in _discard_future_result
    future.result()
  File "./conda_py3.7.4/1.4/linux64/lib/python3.7/site-packages/distributed/deploy/adaptive_core.py", line 194, in adapt
    await self.scale_down(**recommendations)
  File "./conda_py3.7.4/1.4/linux64/lib/python3.7/site-packages/distributed/deploy/adaptive.py", line 156, in scale_down
    names=workers, remove=True, close_workers=True
  File "./conda_py3.7.4/1.4/linux64/lib/python3.7/site-packages/distributed/core.py", line 686, in send_recv_from_rpc
    result = await send_recv(comm=comm, op=key, **kwargs)
  File "./conda_py3.7.4/1.4/linux64/lib/python3.7/site-packages/distributed/core.py", line 556, in send_recv
    raise exc.with_traceback(tb)
  File "./conda_py3.7.4/1.4/linux64/lib/python3.7/site-packages/distributed/core.py", line 412, in handle_comm
    result = await result
  File "./conda_py3.7.4/1.4/linux64/lib/python3.7/site-packages/distributed/scheduler.py", line 3424, in retire_workers
    lock=False,
  File "./conda_py3.7.4/1.4/linux64/lib/python3.7/site-packages/distributed/scheduler.py", line 3178, in replicate
    assert count > 0
AssertionError
./conda_py3.7.4/1.4/linux64/lib/python3.7/site-packages/fastparquet/dataframe.py:5: FutureWarning: pandas.core.index is deprecated and will be removed in a future version.  The public classes are available in the top-level namespace.
  from pandas.core.index import CategoricalIndex, RangeIndex, Index, MultiIndex
distributed.utils - ERROR - 
Traceback (most recent call last):
  File "./conda_py3.7.4/1.4/linux64/lib/python3.7/site-packages/distributed/utils.py", line 662, in log_errors
    yield
  File "./conda_py3.7.4/1.4/linux64/lib/python3.7/site-packages/distributed/scheduler.py", line 3424, in retire_workers
    lock=False,
  File "./conda_py3.7.4/1.4/linux64/lib/python3.7/site-packages/distributed/scheduler.py", line 3178, in replicate
    assert count > 0
AssertionError
distributed.core - ERROR - 
Traceback (most recent call last):
  File "./conda_py3.7.4/1.4/linux64/lib/python3.7/site-packages/distributed/core.py", line 412, in handle_comm
    result = await result
  File "./conda_py3.7.4/1.4/linux64/lib/python3.7/site-packages/distributed/scheduler.py", line 3424, in retire_workers
    lock=False,
  File "./conda_py3.7.4/1.4/linux64/lib/python3.7/site-packages/distributed/scheduler.py", line 3178, in replicate
    assert count > 0
AssertionError
distributed.utils - ERROR - 
Traceback (most recent call last):
  File "./conda_py3.7.4/1.4/linux64/lib/python3.7/site-packages/distributed/utils.py", line 662, in log_errors
    yield
  File "./conda_py3.7.4/1.4/linux64/lib/python3.7/site-packages/distributed/deploy/adaptive.py", line 156, in scale_down
    names=workers, remove=True, close_workers=True
  File "./conda_py3.7.4/1.4/linux64/lib/python3.7/site-packages/distributed/core.py", line 686, in send_recv_from_rpc
    result = await send_recv(comm=comm, op=key, **kwargs)
  File "./conda_py3.7.4/1.4/linux64/lib/python3.7/site-packages/distributed/core.py", line 556, in send_recv
    raise exc.with_traceback(tb)
  File "./conda_py3.7.4/1.4/linux64/lib/python3.7/site-packages/distributed/core.py", line 412, in handle_comm
    result = await result
  File "./conda_py3.7.4/1.4/linux64/lib/python3.7/site-packages/distributed/scheduler.py", line 3424, in retire_workers
    lock=False,
  File "./conda_py3.7.4/1.4/linux64/lib/python3.7/site-packages/distributed/scheduler.py", line 3178, in replicate
    assert count > 0
AssertionError
tornado.application - ERROR - Exception in callback functools.partial(<bound method IOLoop._discard_future_result of <zmq.eventloop.ioloop.ZMQIOLoop object at 0x2b84f7ce9190>>, <Task finished coro=<AdaptiveCore.adapt() done, defined at ./conda_py3.7.4/1.4/linux64/lib/python3.7/site-packages/distributed/deploy/adaptive_core.py:170> exception=AssertionError()>)
Traceback (most recent call last):
  File "./conda_py3.7.4/1.4/linux64/lib/python3.7/site-packages/tornado/ioloop.py", line 743, in _run_callback
    ret = callback()
  File "./conda_py3.7.4/1.4/linux64/lib/python3.7/site-packages/tornado/ioloop.py", line 767, in _discard_future_result
    future.result()
  File "./conda_py3.7.4/1.4/linux64/lib/python3.7/site-packages/distributed/deploy/adaptive_core.py", line 194, in adapt
    await self.scale_down(**recommendations)
  File "./conda_py3.7.4/1.4/linux64/lib/python3.7/site-packages/distributed/deploy/adaptive.py", line 156, in scale_down
    names=workers, remove=True, close_workers=True
  File "./conda_py3.7.4/1.4/linux64/lib/python3.7/site-packages/distributed/core.py", line 686, in send_recv_from_rpc
    result = await send_recv(comm=comm, op=key, **kwargs)
  File "./conda_py3.7.4/1.4/linux64/lib/python3.7/site-packages/distributed/core.py", line 556, in send_recv
    raise exc.with_traceback(tb)
  File "./conda_py3.7.4/1.4/linux64/lib/python3.7/site-packages/distributed/core.py", line 412, in handle_comm
    result = await result
  File "./conda_py3.7.4/1.4/linux64/lib/python3.7/site-packages/distributed/scheduler.py", line 3424, in retire_workers
    lock=False,
  File "./conda_py3.7.4/1.4/linux64/lib/python3.7/site-packages/distributed/scheduler.py", line 3178, in replicate
    assert count > 0
AssertionError
distributed.utils - ERROR - 
Traceback (most recent call last):
  File "./conda_py3.7.4/1.4/linux64/lib/python3.7/site-packages/distributed/utils.py", line 662, in log_errors
    yield
  File "./conda_py3.7.4/1.4/linux64/lib/python3.7/site-packages/distributed/scheduler.py", line 3424, in retire_workers
    lock=False,
  File "./conda_py3.7.4/1.4/linux64/lib/python3.7/site-packages/distributed/scheduler.py", line 3178, in replicate
    assert count > 0
AssertionError
distributed.core - ERROR - 
Traceback (most recent call last):
  File "./conda_py3.7.4/1.4/linux64/lib/python3.7/site-packages/distributed/core.py", line 412, in handle_comm
    result = await result
  File "./conda_py3.7.4/1.4/linux64/lib/python3.7/site-packages/distributed/scheduler.py", line 3424, in retire_workers
    lock=False,
  File "./conda_py3.7.4/1.4/linux64/lib/python3.7/site-packages/distributed/scheduler.py", line 3178, in replicate
    assert count > 0
AssertionError
distributed.utils - ERROR - 
Traceback (most recent call last):
  File "./conda_py3.7.4/1.4/linux64/lib/python3.7/site-packages/distributed/utils.py", line 662, in log_errors
    yield
  File "./conda_py3.7.4/1.4/linux64/lib/python3.7/site-packages/distributed/deploy/adaptive.py", line 156, in scale_down
    names=workers, remove=True, close_workers=True
  File "./conda_py3.7.4/1.4/linux64/lib/python3.7/site-packages/distributed/core.py", line 686, in send_recv_from_rpc
    result = await send_recv(comm=comm, op=key, **kwargs)
  File "./conda_py3.7.4/1.4/linux64/lib/python3.7/site-packages/distributed/core.py", line 556, in send_recv
    raise exc.with_traceback(tb)
  File "./conda_py3.7.4/1.4/linux64/lib/python3.7/site-packages/distributed/core.py", line 412, in handle_comm
    result = await result
  File "./conda_py3.7.4/1.4/linux64/lib/python3.7/site-packages/distributed/scheduler.py", line 3424, in retire_workers
    lock=False,
  File "./conda_py3.7.4/1.4/linux64/lib/python3.7/site-packages/distributed/scheduler.py", line 3178, in replicate
    assert count > 0
AssertionError
tornado.application - ERROR - Exception in callback functools.partial(<bound method IOLoop._discard_future_result of <zmq.eventloop.ioloop.ZMQIOLoop object at 0x2b84f7ce9190>>, <Task finished coro=<AdaptiveCore.adapt() done, defined at ./conda_py3.7.4/1.4/linux64/lib/python3.7/site-packages/distributed/deploy/adaptive_core.py:170> exception=AssertionError()>)
Traceback (most recent call last):
  File "./conda_py3.7.4/1.4/linux64/lib/python3.7/site-packages/tornado/ioloop.py", line 743, in _run_callback
    ret = callback()
  File "./conda_py3.7.4/1.4/linux64/lib/python3.7/site-packages/tornado/ioloop.py", line 767, in _discard_future_result
    future.result()
  File "./conda_py3.7.4/1.4/linux64/lib/python3.7/site-packages/distributed/deploy/adaptive_core.py", line 194, in adapt
    await self.scale_down(**recommendations)
  File "./conda_py3.7.4/1.4/linux64/lib/python3.7/site-packages/distributed/deploy/adaptive.py", line 156, in scale_down
    names=workers, remove=True, close_workers=True
  File "./conda_py3.7.4/1.4/linux64/lib/python3.7/site-packages/distributed/core.py", line 686, in send_recv_from_rpc
    result = await send_recv(comm=comm, op=key, **kwargs)
  File "./conda_py3.7.4/1.4/linux64/lib/python3.7/site-packages/distributed/core.py", line 556, in send_recv
    raise exc.with_traceback(tb)
  File "./conda_py3.7.4/1.4/linux64/lib/python3.7/site-packages/distributed/core.py", line 412, in handle_comm
    result = await result
  File "./conda_py3.7.4/1.4/linux64/lib/python3.7/site-packages/distributed/scheduler.py", line 3424, in retire_workers
    lock=False,
  File "./conda_py3.7.4/1.4/linux64/lib/python3.7/site-packages/distributed/scheduler.py", line 3178, in replicate
    assert count > 0
AssertionError
distributed.utils - ERROR - 
Traceback (most recent call last):
  File "./conda_py3.7.4/1.4/linux64/lib/python3.7/site-packages/distributed/utils.py", line 662, in log_errors
    yield
  File "./conda_py3.7.4/1.4/linux64/lib/python3.7/site-packages/distributed/scheduler.py", line 3424, in retire_workers
    lock=False,
  File "./conda_py3.7.4/1.4/linux64/lib/python3.7/site-packages/distributed/scheduler.py", line 3178, in replicate
    assert count > 0
AssertionError
distributed.core - ERROR - 
Traceback (most recent call last):
  File "./conda_py3.7.4/1.4/linux64/lib/python3.7/site-packages/distributed/core.py", line 412, in handle_comm
    result = await result
  File "./conda_py3.7.4/1.4/linux64/lib/python3.7/site-packages/distributed/scheduler.py", line 3424, in retire_workers
    lock=False,
  File "./conda_py3.7.4/1.4/linux64/lib/python3.7/site-packages/distributed/scheduler.py", line 3178, in replicate
    assert count > 0
AssertionError
distributed.utils - ERROR - 
Traceback (most recent call last):
  File "./conda_py3.7.4/1.4/linux64/lib/python3.7/site-packages/distributed/utils.py", line 662, in log_errors
    yield
  File "./conda_py3.7.4/1.4/linux64/lib/python3.7/site-packages/distributed/deploy/adaptive.py", line 156, in scale_down
    names=workers, remove=True, close_workers=True
  File "./conda_py3.7.4/1.4/linux64/lib/python3.7/site-packages/distributed/core.py", line 686, in send_recv_from_rpc
    result = await send_recv(comm=comm, op=key, **kwargs)
  File "./conda_py3.7.4/1.4/linux64/lib/python3.7/site-packages/distributed/core.py", line 556, in send_recv
    raise exc.with_traceback(tb)
  File "./conda_py3.7.4/1.4/linux64/lib/python3.7/site-packages/distributed/core.py", line 412, in handle_comm
    result = await result
  File "./conda_py3.7.4/1.4/linux64/lib/python3.7/site-packages/distributed/scheduler.py", line 3424, in retire_workers
    lock=False,
  File "./conda_py3.7.4/1.4/linux64/lib/python3.7/site-packages/distributed/scheduler.py", line 3178, in replicate
    assert count > 0
AssertionError
tornado.application - ERROR - Exception in callback functools.partial(<bound method IOLoop._discard_future_result of <zmq.eventloop.ioloop.ZMQIOLoop object at 0x2b84f7ce9190>>, <Task finished coro=<AdaptiveCore.adapt() done, defined at ./conda_py3.7.4/1.4/linux64/lib/python3.7/site-packages/distributed/deploy/adaptive_core.py:170> exception=AssertionError()>)
Traceback (most recent call last):
  File "./conda_py3.7.4/1.4/linux64/lib/python3.7/site-packages/tornado/ioloop.py", line 743, in _run_callback
    ret = callback()
  File "./conda_py3.7.4/1.4/linux64/lib/python3.7/site-packages/tornado/ioloop.py", line 767, in _discard_future_result
    future.result()
  File "./conda_py3.7.4/1.4/linux64/lib/python3.7/site-packages/distributed/deploy/adaptive_core.py", line 194, in adapt
    await self.scale_down(**recommendations)
  File "./conda_py3.7.4/1.4/linux64/lib/python3.7/site-packages/distributed/deploy/adaptive.py", line 156, in scale_down
    names=workers, remove=True, close_workers=True
  File "./conda_py3.7.4/1.4/linux64/lib/python3.7/site-packages/distributed/core.py", line 686, in send_recv_from_rpc
    result = await send_recv(comm=comm, op=key, **kwargs)
  File "./conda_py3.7.4/1.4/linux64/lib/python3.7/site-packages/distributed/core.py", line 556, in send_recv
    raise exc.with_traceback(tb)
  File "./conda_py3.7.4/1.4/linux64/lib/python3.7/site-packages/distributed/core.py", line 412, in handle_comm
    result = await result
  File "./conda_py3.7.4/1.4/linux64/lib/python3.7/site-packages/distributed/scheduler.py", line 3424, in retire_workers
    lock=False,
  File "./conda_py3.7.4/1.4/linux64/lib/python3.7/site-packages/distributed/scheduler.py", line 3178, in replicate
    assert count > 0
AssertionError
client.get_versions()
{'scheduler': {'host': (('python', '3.7.4.final.0'),
   ('python-bits', 64),
   ('OS', 'Linux'),
   ('OS-release', '2.6.32-696.18.7.el6.x86_64'),
   ('machine', 'x86_64'),
   ('processor', 'x86_64'),
   ('byteorder', 'little'),
   ('LC_ALL', 'C'),
   ('LANG', 'C')),
  'packages': {'dask': '2.10.1',
   'distributed': '2.10.0',
   'msgpack': '0.6.1',
   'cloudpickle': '1.2.2',
   'tornado': '6.0.3',
   'toolz': '0.10.0',
   'numpy': '1.18.1',
   'lz4': None,
   'blosc': None}},
 'workers': {'tcp://10.6.16.115:43394': {'host': (('python', '3.7.4.final.0'),
    ('python-bits', 64),
    ('OS', 'Linux'),
    ('OS-release', '2.6.32-696.18.7.el6.x86_64'),
    ('machine', 'x86_64'),
    ('processor', 'x86_64'),
    ('byteorder', 'little'),
    ('LC_ALL', 'C'),
    ('LANG', 'C')),
   'packages': {'dask': '2.10.1',
    'distributed': '2.10.0',
    'msgpack': '0.6.1',
    'cloudpickle': '1.2.2',
    'tornado': '6.0.3',
    'toolz': '0.10.0',
    'numpy': '1.18.1',
    'lz4': None,
    'blosc': None}}},
 'client': {'host': [('python', '3.7.4.final.0'),
   ('python-bits', 64),
   ('OS', 'Linux'),
   ('OS-release', '2.6.32-696.18.7.el6.x86_64'),
   ('machine', 'x86_64'),
   ('processor', 'x86_64'),
   ('byteorder', 'little'),
   ('LC_ALL', 'C'),
   ('LANG', 'C')],
  'packages': {'dask': '2.10.1',
   'distributed': '2.10.0',
   'msgpack': '0.6.1',
   'cloudpickle': '1.2.2',
   'tornado': '6.0.3',
   'toolz': '0.10.0',
   'numpy': '1.18.1',
   'lz4': None,
   'blosc': None}}}

dask_jobqueue.version is 0.7.0

@lesteve
Copy link
Member

lesteve commented Apr 3, 2020

Great thanks, I have edited your message (formatting inside <details> is a bit finicky you need to skip a few lines here and there I think, not 100% sure where, but I get it to work after a few attempts)

@lesteve
Copy link
Member

lesteve commented Apr 3, 2020

I am going to reopen this, I am not sure I will have too much time in the short term, but I'll try to reproduce.

@lesteve lesteve reopened this Apr 3, 2020
@msmicker
Copy link

I am seeing this same error (as pbrady32). I am using adapt scaling with an SGEcluster. I had running without any issues - I can't be sure, but the only new aspect added is writing to parquet. Perhaps something related to both adaptive scaling and parquet?

@msmicker
Copy link

msmicker commented May 6, 2020

Here are more details from my situation. It seems to be generic to jobqueue or distributed? (since I am using SGEcluster)

  • this only happens when I am dealing with adaptive scaling and parquet files (though my other use is fairly limited, e.g. single large csv files and json files)
  • the error does not always occur - it is more of a warning since the block completes
  • it can occur early, middle or late in a process

Let me know if I can offer more details to help.

My versions

dask                      2.14.0                     py_0    conda-forge
dask-core                 2.14.0                     py_0    conda-forge
dask-jobqueue             0.7.1                      py_0    conda-forge
dask-labextension         2.0.1                      py_0    conda-forge
distributed               2.14.0           py38h32f6830_0    conda-forge
Error stacktrace
distributed.utils - ERROR - 
Traceback (most recent call last):
  File "/hpc/home/user-id/.conda/envs/jupyterlab/lib/python3.8/site-packages/distributed/utils.py", line 665, in log_errors
    yield
  File "/hpc/home/user-id/.conda/envs/jupyterlab/lib/python3.8/site-packages/distributed/scheduler.py", line 3440, in retire_workers
    await self.replicate(
  File "/hpc/home/user-id/.conda/envs/jupyterlab/lib/python3.8/site-packages/distributed/scheduler.py", line 3199, in replicate
    assert count > 0
AssertionError
distributed.core - ERROR - 
Traceback (most recent call last):
  File "/hpc/home/user-id/.conda/envs/jupyterlab/lib/python3.8/site-packages/distributed/core.py", line 411, in handle_comm
    result = await result
  File "/hpc/home/user-id/.conda/envs/jupyterlab/lib/python3.8/site-packages/distributed/scheduler.py", line 3440, in retire_workers
    await self.replicate(
  File "/hpc/home/user-id/.conda/envs/jupyterlab/lib/python3.8/site-packages/distributed/scheduler.py", line 3199, in replicate
    assert count > 0
AssertionError
distributed.utils - ERROR - 
Traceback (most recent call last):
  File "/hpc/home/user-id/.conda/envs/jupyterlab/lib/python3.8/site-packages/distributed/utils.py", line 665, in log_errors
    yield
  File "/hpc/home/user-id/.conda/envs/jupyterlab/lib/python3.8/site-packages/distributed/deploy/adaptive.py", line 185, in scale_down
    await self.scheduler.retire_workers(
  File "/hpc/home/user-id/.conda/envs/jupyterlab/lib/python3.8/site-packages/distributed/core.py", line 687, in send_recv_from_rpc
    result = await send_recv(comm=comm, op=key, **kwargs)
  File "/hpc/home/user-id/.conda/envs/jupyterlab/lib/python3.8/site-packages/distributed/core.py", line 557, in send_recv
    raise exc.with_traceback(tb)
  File "/hpc/home/user-id/.conda/envs/jupyterlab/lib/python3.8/site-packages/distributed/core.py", line 411, in handle_comm
    result = await result
  File "/hpc/home/user-id/.conda/envs/jupyterlab/lib/python3.8/site-packages/distributed/scheduler.py", line 3440, in retire_workers
    await self.replicate(
  File "/hpc/home/user-id/.conda/envs/jupyterlab/lib/python3.8/site-packages/distributed/scheduler.py", line 3199, in replicate
    assert count > 0
AssertionError
tornado.application - ERROR - Exception in callback functools.partial(<bound method IOLoop._discard_future_result of <zmq.eventloop.ioloop.ZMQIOLoop object at 0x2aaab4ff5ca0>>, <Task finished name='Task-97235' coro=<AdaptiveCore.adapt() done, defined at /hpc/home/user-id/.conda/envs/jupyterlab/lib/python3.8/site-packages/distributed/deploy/adaptive_core.py:170> exception=AssertionError()>)
Traceback (most recent call last):
  File "/hpc/home/user-id/.conda/envs/jupyterlab/lib/python3.8/site-packages/tornado/ioloop.py", line 743, in _run_callback
    ret = callback()
  File "/hpc/home/user-id/.conda/envs/jupyterlab/lib/python3.8/site-packages/tornado/ioloop.py", line 767, in _discard_future_result
    future.result()
  File "/hpc/home/user-id/.conda/envs/jupyterlab/lib/python3.8/site-packages/distributed/deploy/adaptive_core.py", line 194, in adapt
    await self.scale_down(**recommendations)
  File "/hpc/home/user-id/.conda/envs/jupyterlab/lib/python3.8/site-packages/distributed/deploy/adaptive.py", line 185, in scale_down
    await self.scheduler.retire_workers(
  File "/hpc/home/user-id/.conda/envs/jupyterlab/lib/python3.8/site-packages/distributed/core.py", line 687, in send_recv_from_rpc
    result = await send_recv(comm=comm, op=key, **kwargs)
  File "/hpc/home/user-id/.conda/envs/jupyterlab/lib/python3.8/site-packages/distributed/core.py", line 557, in send_recv
    raise exc.with_traceback(tb)
  File "/hpc/home/user-id/.conda/envs/jupyterlab/lib/python3.8/site-packages/distributed/core.py", line 411, in handle_comm
    result = await result
  File "/hpc/home/user-id/.conda/envs/jupyterlab/lib/python3.8/site-packages/distributed/scheduler.py", line 3440, in retire_workers
    await self.replicate(
  File "/hpc/home/user-id/.conda/envs/jupyterlab/lib/python3.8/site-packages/distributed/scheduler.py", line 3199, in replicate
    assert count > 0
AssertionError

@lesteve
Copy link
Member

lesteve commented May 6, 2020

This is very likely a distributed problem but to be sure it would be necessary to reproduce with a LocalCluster to be 100% sure. If someone manages to do that, it would be great to open an issue in the distributed tracker.

Just in case, can you try to use the latest dask and distributed versions (2.15 and 2.15.2 at the time of writing)?

@jennalc
Copy link

jennalc commented Nov 26, 2020

I also hit this error in some cases when using the ConductorCluster from #472 in adaptive mode. Similar to previous commenters, the error does not occur in all cases and happens sporadically. Re-running the cell typically is successful.

Environment:

dask 2.30.0
distributed 2.30.1
dask-jobqueue 0.7.1

@guillaumeeb
Copy link
Member

Thanks @jennalc for the update. Are you also using Parquet, or is it unrelated to that?

@jennalc
Copy link

jennalc commented Jan 25, 2021

@guillaumeeb - No, I was not using parquet, but was working with a large CSV.
I had actually done some searching in the distributed repo a few weeks ago and found some reference there from a few years ago that seemed to indicate it is not specific to dask-jobqueue, as as suggested earlier in this issue.
dask/distributed#1930

@jrbourbeau
Copy link
Member

Some operations, e.g. Client.replicate, assume a non-active cluster and aren't robust to things like workers going away or coming online. It looks like the particular assert that's getting triggered here is related to that. FWIW we hope to improve this situation in the future to make these operations more robust

@guillaumeeb
Copy link
Member

OK, thanks both. Closing this again as it is quite probably linked to a wrong behaviour of some functions when using adaptive clusters.

Not related to Dask-jobqueue specificaly. But I'd say that this is probably very visible with it, as adaptive workers can go in and out very fast in a cluster with some free resources!

@nsmith-
Copy link

nsmith- commented Mar 4, 2021

I ran into this also with my cluster implementation (mostly HTCondorCluster but using directly the htcondor bindings and implementing async) and am wondering if the root cause of this may also explain why I saw some cases where the cluster lost a future and had to recompute rather large chunks of the task graph. Is it possible the adaptive deployment is somehow force closing the workers before the futures have a chance to migrate? I've been struggling to understand exactly when and how the async ProcessInterface.close is called

@guillaumeeb
Copy link
Member

Is it possible the adaptive deployment is somehow force closing the workers before the futures have a chance to migrate?

Adaptive cluster is a tricky set-up. There might still be some edge cases where it fails closing properly things. But I think you'd better ask this question on distributed issue tracker, but non before having some code that reproduce things a bit ;). I know, this is sometimes almost impossible...

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests