Skip to content

setting distributed.scheduler.allowed-failures to 0 does not always work #6078

Closed
@WillWang-MindBridge

Description

@WillWang-MindBridge

What happened:
Set the configuration distributed.scheduler.allowed-failures to 0 and trigger a worker restart by filling up the memory. Sometimes(yes, you need to run the sample code several times to see that if you are lucky), dask seems to ignore that config and just retry the delayed function several times.

What you expected to happen:
When the property is set to 0, there should not be retries.

Minimal Complete Verifiable Example:

from time import sleep

import dask
import numpy as np
import pandas as pd
from dask import delayed
from distributed import Client, futures_of, LocalCluster


@delayed
def f1():
    print("running f1")
    sleep(5)
    df = pd.DataFrame(dict(row_id=np.zeros(10000000)))
    return df


def main():
    with dask.config.set({'distributed.scheduler.allowed-failures': 0, "distributed.logging.distributed": "DEBUG"}):
        with LocalCluster(n_workers=1, threads_per_worker=1, memory_limit="180MiB") as cluster, Client(cluster) as client:
            d = f1()
            future = futures_of(client.persist(d))[0]
            future.result()


if __name__ == "__main__":
    main()

Anything else we need to know?:
logs showing that f1 is executed 3 times

running f1
distributed.worker - WARNING - Worker is at 85% memory usage. Pausing worker.  Process memory: 153.12 MiB -- Worker memory limit: 180.00 MiB
distributed.worker - WARNING - Unmanaged memory use is high. This may indicate a memory leak or the memory may not be released to the OS; see https://distributed.dask.org/en/latest/worker.html#memtrim for more information. -- Unmanaged memory: 153.12 MiB -- Worker memory limit: 180.00 MiB
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.scheduler - ERROR - Couldn't gather keys {'f1-433077ca-b354-4bc5-987a-53cfa64a0d0a': ['tcp://127.0.0.1:55016']} state: ['no-worker'] workers: ['tcp://127.0.0.1:55016']
NoneType: None
distributed.scheduler - ERROR - Workers don't have promised key: ['tcp://127.0.0.1:55016'], f1-433077ca-b354-4bc5-987a-53cfa64a0d0a
NoneType: None
distributed.client - WARNING - Couldn't gather 1 keys, rescheduling {'f1-433077ca-b354-4bc5-987a-53cfa64a0d0a': ('tcp://127.0.0.1:55016',)}
distributed.nanny - WARNING - Restarting worker
running f1
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.scheduler - ERROR - Couldn't gather keys {'f1-433077ca-b354-4bc5-987a-53cfa64a0d0a': ['tcp://127.0.0.1:55023']} state: ['no-worker'] workers: ['tcp://127.0.0.1:55023']
NoneType: None
distributed.scheduler - ERROR - Workers don't have promised key: ['tcp://127.0.0.1:55023'], f1-433077ca-b354-4bc5-987a-53cfa64a0d0a
NoneType: None
distributed.client - WARNING - Couldn't gather 1 keys, rescheduling {'f1-433077ca-b354-4bc5-987a-53cfa64a0d0a': ('tcp://127.0.0.1:55023',)}
distributed.nanny - WARNING - Restarting worker
running f1
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.nanny - WARNING - Restarting worker
Traceback (most recent call last):
  File "/Users/my_user/git/my_project/toy12.py", line 28, in <module>
    main()
  File "/Users/my_user/git/my_project/toy12.py", line 23, in main
    future.result()
  File "/Users/my_user/virtual_env/my_project/lib/python3.8/site-packages/distributed/client.py", line 235, in result
    result = self.client.sync(self._result, callback_timeout=timeout, raiseit=False)
  File "/Users/my_user/virtual_env/my_project/lib/python3.8/site-packages/distributed/utils.py", line 310, in sync
    return sync(
  File "/Users/my_user/virtual_env/my_project/lib/python3.8/site-packages/distributed/utils.py", line 364, in sync
    raise exc.with_traceback(tb)
  File "/Users/my_user/virtual_env/my_project/lib/python3.8/site-packages/distributed/utils.py", line 349, in f
    result[0] = yield future
  File "/Users/my_user/virtual_env/my_project/lib/python3.8/site-packages/tornado/gen.py", line 762, in run
    value = future.result()
  File "/Users/my_user/virtual_env/my_project/lib/python3.8/site-packages/distributed/client.py", line 260, in _result
    result = await self.client._gather([self])
  File "/Users/my_user/virtual_env/my_project/lib/python3.8/site-packages/distributed/client.py", line 1811, in _gather
    raise exception.with_traceback(traceback)
distributed.scheduler.KilledWorker: ('f1-433077ca-b354-4bc5-987a-53cfa64a0d0a', <WorkerState 'tcp://127.0.0.1:55032', name: 0, status: closed, memory: 0, processing: 1>)

Process finished with exit code 1

Environment:

  • Dask version: 2021.12.0
  • Python version: Python 3.8.9
  • Operating System: MacOS bigsur 11.2.2
  • Install method (conda, pip, source): pip
Cluster Dump State:

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions