Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KeyError when looking at durations of executing tasks #4587

Closed
mrocklin opened this issue Mar 15, 2021 · 7 comments
Closed

KeyError when looking at durations of executing tasks #4587

mrocklin opened this issue Mar 15, 2021 · 7 comments

Comments

@mrocklin
Copy link
Member

mrocklin commented Mar 15, 2021

When stopping an in-flight computation I get the following traceback:

Traceback (most recent call last):
  File "/home/mrocklin/workspace/tornado/tornado/ioloop.py", line 741, in _run_callback
    ret = callback()
  File "/home/mrocklin/workspace/tornado/tornado/ioloop.py", line 765, in _discard_future_result
    future.result()
  File "/home/mrocklin/workspace/distributed/distributed/worker.py", line 942, in heartbeat
    response = await retry_operation(
  File "/home/mrocklin/workspace/distributed/distributed/utils_comm.py", line 384, in retry_operation
    return await retry(
  File "/home/mrocklin/workspace/distributed/distributed/utils_comm.py", line 369, in retry
    return await coro()
  File "/home/mrocklin/workspace/distributed/distributed/core.py", line 861, in send_recv_from_rpc
    result = await send_recv(comm=comm, op=key, **kwargs)
  File "/home/mrocklin/workspace/distributed/distributed/core.py", line 660, in send_recv
    raise exc.with_traceback(tb)
  File "/home/mrocklin/workspace/distributed/distributed/core.py", line 496, in handle_comm
    result = handler(comm, **msg)
  File "/home/mrocklin/workspace/distributed/distributed/scheduler.py", line 3653, in heartbeat_worker
    ws._executing = {
  File "/home/mrocklin/workspace/distributed/distributed/scheduler.py", line 3654, in <dictcomp>
    parent._tasks[key]: duration for key, duration in executing.items()
KeyError: "('random_sample-qr-2c558c0a248d6692297cf169f1415126', 921, 0)"

The actual computation I was running was the following, but I suspect that this won't be hard to reproduce.

from dask.distributed import Client
client = Client(n_workers=20)

import dask.array as da, dask

# start one
x = da.random.random((10000000, 100), chunks=(10000, None))
u, s, v = da.linalg.svd(x)
u, s, v = dask.persist(u, s, v)

import time
time.sleep(3)  # This is just a guess, untested

# then decide to change it a bit and rerun
x = da.random.random((2000000, 100), chunks=(10000, None))
u, s, v = da.linalg.svd(x)
u, s, v = dask.persist(u, s, v)

cc @gforsyth @fjetter

@fredms
Copy link

fredms commented May 15, 2021

Hi, will this issue be fixed soon? I am seeing the similar issue several times. Is it possibly caused by unhealth nodes?

distributed.core - ERROR - Exception while handling op heartbeat_worker
Traceback (most recent call last):
  File "/azureml-envs/azureml_332e860ba9095f5402ebec9f6bfc9c5b/lib/python3.7/site-packages/distributed/core.py", line 497, in handle_comm
    result = handler(comm, **msg)
  File "/azureml-envs/azureml_332e860ba9095f5402ebec9f6bfc9c5b/lib/python3.7/site-packages/distributed/scheduler.py", line 3821, in heartbeat_worker
    parent._tasks[key]: duration for key, duration in executing.items()
  File "/azureml-envs/azureml_332e860ba9095f5402ebec9f6bfc9c5b/lib/python3.7/site-packages/distributed/scheduler.py", line 3821, in <dictcomp>
    parent._tasks[key]: duration for key, duration in executing.items()
KeyError: "('repartition-write_df_local_and_run-len-chunk-c6628c1197d1328738f9a76edfb721ad', 0, 79, 0)"

@chukarsten
Copy link

This is something we're also running into. Not sure why the task seems to be dropped.

@mrocklin
Copy link
Member Author

@fjetter bringing this one to your attention again. I was curious and tried it under #5046 but didn't see an improvement yet.

@fjetter
Copy link
Member

fjetter commented Jul 13, 2021

I could reproduce and fix the error in #5053. I wasn't able to actually make something fail, i.e. the cluster would still continue operating as before but this log would show up. the log itself shouldn't have been harmful and the cluster continued operations as expected after issuing the log.

The warning itself appears to be triggered by a subtle race condition after tasks are cancelled while the worker is still executing something. This is not related to the worker state machine but rather an artefact of network latencies.

Cancellations may sometimes not be obvious. In the above example, the cancellation is done implicitly via garbage collect once the variables u, s, v are re-assigned.

If anything else is happening interfering with the expected behaviour, that's likely not the log but something else.

@chukarsten
Copy link

The behavior that we seem to be experiencing is that the task that we're trying to get Dask to evaluate is "lost" and the scheduler seems to try over and over again to submit and evaluate the job successfully. The logs that we've seen sometimes reflect that this will work and our CI will continue as normal, only to stutter a few more times down the road. The end result seems to be a drastic increase in runtime as Dask attempts, fails, and starts over to complete its work. I'd be happy to try and provide more information, but this is all occurring on Gitlab external executors.

I can say that the method by which we're using Dask is maybe a little unorthodox in that we're spinning up LocalClusters as the larger job, which is using dask to parallelize smaller components, is requesting them. Something like:

def big_job():
    cluster=LocalCluster(n_workers=2)
    engine = Engine(client(cluster))
    results = do_smaller_jobs(engine)

Not sure my methodology is causing something to accidentally get gc'd before it should....

@fjetter
Copy link
Member

fjetter commented Jul 13, 2021

@chukarsten I get why you perceive this kind of scheduling as unorthodox but rest assured, you're not the first one and we have a test for this, see

@gen_cluster(client=True, nthreads=[("127.0.0.1", 0)])
async def test_get_client_functions_spawn_clusters(c, s, a):
# see gh4565
scheduler_addr = c.scheduler.address
def f(x):
ref = None
with LocalCluster(
n_workers=1,
processes=False,
dashboard_address=False,
worker_dashboard_address=False,
) as cluster2:
with Client(cluster2) as c1:
c2 = get_client()
c1_scheduler = c1.scheduler.address
c2_scheduler = c2.scheduler.address
assert c1_scheduler != c2_scheduler
assert c2_scheduler == scheduler_addr
await c.gather(c.map(f, range(2)))
await a.close()
c_default = default_client()
assert c is c_default

I don't see a reason how this would interfere since the small-local-cluster is operating in a dedicated context and should not interfere with your global scheduler. I suspect another issue causing your tasks to be "lost". May I ask you to open another issue with a bit more information? For instance, is "big-cluster" or "small-cluster" loosing jobs? How exactly are you scheduling tasks with the client? (fut = client.submit(...), etc.) Can you produce a minimal example? (that would be extremely helpful)

@mrocklin
Copy link
Member Author

This seems to be resolved in main. Thanks @fjetter !

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
4 participants