From f0747bb4d661ec54abd5c47eb8d65125c4d0d865 Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Tue, 25 Oct 2022 12:56:10 -0400 Subject: [PATCH 1/4] Enable queuing by default --- distributed/distributed.yaml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/distributed/distributed.yaml b/distributed/distributed.yaml index 105a45e9009..46c3cd4bb72 100644 --- a/distributed/distributed.yaml +++ b/distributed/distributed.yaml @@ -22,7 +22,7 @@ distributed: events-log-length: 100000 work-stealing: True # workers should steal tasks from each other work-stealing-interval: 100ms # Callback time for work stealing - worker-saturation: .inf # Send this fraction of nthreads root tasks to workers + worker-saturation: 1.1 # Send this fraction of nthreads root tasks to workers worker-ttl: "5 minutes" # like '60s'. Time to live for workers. They must heartbeat faster than this pickle: True # Is the scheduler allowed to deserialize arbitrary bytestrings preload: [] # Run custom modules with Scheduler @@ -152,7 +152,7 @@ distributed: # Fractions of worker process memory at which we take action to avoid memory # blowup. Set any of the values to False to turn off the behavior entirely. # All fractions are relative to each worker's memory_limit. - transfer: 0.10 # fractional size of incoming data transfers where we start + transfer: 0.10 # fractional size of incoming data transfers where we start # throttling incoming data transfers target: 0.60 # fraction of managed memory where we start spilling to disk spill: 0.70 # fraction of process memory where we start spilling to disk From 8169cc6aa3dc2e978696e0bb59af85aae5efb15c Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Tue, 25 Oct 2022 12:56:18 -0400 Subject: [PATCH 2/4] Flip CI --- .github/workflows/tests.yaml | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/.github/workflows/tests.yaml b/.github/workflows/tests.yaml index 630073ffe59..598fe7a3359 100644 --- a/.github/workflows/tests.yaml +++ b/.github/workflows/tests.yaml @@ -24,7 +24,7 @@ jobs: matrix: os: [ubuntu-latest, windows-latest, macos-latest] python-version: ["3.8", "3.9", "3.10"] - queuing: [no_queue] + queuing: [queue] # Cherry-pick test modules to split the overall runtime roughly in half partition: [ci1, not ci1] exclude: @@ -33,11 +33,11 @@ jobs: include: - os: ubuntu-latest python-version: 3.9 - queuing: queue + queuing: no_queue partition: "ci1" - os: ubuntu-latest python-version: 3.9 - queuing: queue + queuing: no_queue partition: "not ci1" # Uncomment to stress-test the test suite for random failures. @@ -144,8 +144,8 @@ jobs: - name: Set up dask env for job queuing shell: bash -l {0} - if: ${{ matrix.queuing == 'queue' }} - run: echo "DASK_DISTRIBUTED__SCHEDULER__WORKER_SATURATION=1.0" >> $GITHUB_ENV + if: ${{ matrix.queuing == 'no_queue' }} + run: echo "DASK_DISTRIBUTED__SCHEDULER__WORKER_SATURATION=inf" >> $GITHUB_ENV - name: Print host info shell: bash -l {0} From 2ae7f04bd66d7c5884ca1d226d9fdc627fdbd8b3 Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Tue, 25 Oct 2022 14:56:36 -0400 Subject: [PATCH 3/4] update `test_ready_remove_worker` --- distributed/tests/test_scheduler.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/distributed/tests/test_scheduler.py b/distributed/tests/test_scheduler.py index ce66ada35eb..59cc5b56e4a 100644 --- a/distributed/tests/test_scheduler.py +++ b/distributed/tests/test_scheduler.py @@ -913,8 +913,11 @@ def f(x, y=2): assert set(d) == {"function", "args"} +@pytest.mark.parametrize("worker_saturation", [1.0, float("inf")]) @gen_cluster() -async def test_ready_remove_worker(s, a, b): +async def test_ready_remove_worker(s, a, b, worker_saturation): + s.WORKER_SATURATION = worker_saturation + s.update_graph( tasks={"x-%d" % i: dumps_task((inc, i)) for i in range(20)}, keys=["x-%d" % i for i in range(20)], From 23135dada41a1c88584118f5fafbc2616c567947 Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Mon, 31 Oct 2022 13:30:46 -0600 Subject: [PATCH 4/4] skip round robin for queuing hopefully will be able to remove this in the future. --- distributed/tests/test_scheduler.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/distributed/tests/test_scheduler.py b/distributed/tests/test_scheduler.py index 59cc5b56e4a..25dcdcc5f40 100644 --- a/distributed/tests/test_scheduler.py +++ b/distributed/tests/test_scheduler.py @@ -2852,6 +2852,10 @@ async def test_get_worker_monitor_info(s, a, b): assert res[w.address]["last_time"] is not None +@pytest.mark.skipif( + math.isfinite(dask.config.get("distributed.scheduler.worker-saturation")), + reason="Round-robin not enabled with queuing; see https://github.com/dask/distributed/issues/7197", +) @gen_cluster(client=True) async def test_quiet_cluster_round_robin(c, s, a, b): await c.submit(inc, 1)