From f6a77be2db32ca1cc75134b8f985fb37c06bf937 Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Fri, 5 Aug 2022 13:22:25 +0200 Subject: [PATCH 01/14] Spill/unspill 10x memory size --- tests/stability/test_spill.py | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) create mode 100644 tests/stability/test_spill.py diff --git a/tests/stability/test_spill.py b/tests/stability/test_spill.py new file mode 100644 index 0000000000..af6381567c --- /dev/null +++ b/tests/stability/test_spill.py @@ -0,0 +1,20 @@ +import pytest + +import dask +import dask.array as da +from distributed import wait + + +@pytest.mark.stability +def test_spilling(): + with dask.config.set({"distributed.scheduler.allowed-failures": 0}): + with Cluster( + name=f"test_spilling-{uuid.uuid4().hex}", + n_workers=5, + worker_disk_size=50, + wait_for_workers=True, + ) as cluster: + with Client(cluster) as client: + data = da.random.random((200, 2 ** 27)) # 200 GiB + wait(data.persist()) + data.sum().compute() From 47a77cbbecd9b69863f80d462f4d6063353844a7 Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Fri, 5 Aug 2022 13:23:24 +0200 Subject: [PATCH 02/14] black --- tests/stability/test_spill.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/stability/test_spill.py b/tests/stability/test_spill.py index af6381567c..26daa55b80 100644 --- a/tests/stability/test_spill.py +++ b/tests/stability/test_spill.py @@ -15,6 +15,6 @@ def test_spilling(): wait_for_workers=True, ) as cluster: with Client(cluster) as client: - data = da.random.random((200, 2 ** 27)) # 200 GiB + data = da.random.random((200, 2**27)) # 200 GiB wait(data.persist()) data.sum().compute() From e97c21d54395191eed07bac30e128a2c3a02cabc Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Fri, 5 Aug 2022 13:27:09 +0200 Subject: [PATCH 03/14] isort and imports --- tests/stability/test_spill.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/stability/test_spill.py b/tests/stability/test_spill.py index 26daa55b80..90b6d019ad 100644 --- a/tests/stability/test_spill.py +++ b/tests/stability/test_spill.py @@ -1,8 +1,8 @@ -import pytest - import dask import dask.array as da -from distributed import wait +import pytest +from coiled.v2 import Cluster +from dask.distributed import Client, wait @pytest.mark.stability From e7d3e18f835c6c1e0d56180f0ab88b0fa3bd175a Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Fri, 5 Aug 2022 13:45:21 +0200 Subject: [PATCH 04/14] Adjust tests to return measurements if run standalone --- tests/stability/test_spill.py | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/tests/stability/test_spill.py b/tests/stability/test_spill.py index 90b6d019ad..4dc6bc99c7 100644 --- a/tests/stability/test_spill.py +++ b/tests/stability/test_spill.py @@ -1,3 +1,5 @@ +import uuid + import dask import dask.array as da import pytest @@ -15,6 +17,8 @@ def test_spilling(): wait_for_workers=True, ) as cluster: with Client(cluster) as client: - data = da.random.random((200, 2**27)) # 200 GiB - wait(data.persist()) - data.sum().compute() + arr = da.random.random((200, 2**27)) # 200 GiB + wait(arr.persist()) + fut = client.compute(arr.sum()) + del arr + wait(fut) From 57e8fe8f4b1285ba740db080a1fb57ce773eea49 Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Fri, 5 Aug 2022 15:37:03 +0200 Subject: [PATCH 05/14] Increase available disk --- tests/stability/test_spill.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/stability/test_spill.py b/tests/stability/test_spill.py index 4dc6bc99c7..cc1ee9d224 100644 --- a/tests/stability/test_spill.py +++ b/tests/stability/test_spill.py @@ -13,12 +13,12 @@ def test_spilling(): with Cluster( name=f"test_spilling-{uuid.uuid4().hex}", n_workers=5, - worker_disk_size=50, + worker_disk_size=55, wait_for_workers=True, ) as cluster: with Client(cluster) as client: - arr = da.random.random((200, 2**27)) # 200 GiB - wait(arr.persist()) + arr = da.random.random((200, 2**27)).persist() # 200 GiB + wait(arr) fut = client.compute(arr.sum()) del arr wait(fut) From 6cbec3eca13b6b08ec90fdcbd73d536a1ea7f2bc Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Fri, 5 Aug 2022 16:14:11 +0200 Subject: [PATCH 06/14] Run test for version where data should be kept and where data should be evicted --- tests/stability/test_spill.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/tests/stability/test_spill.py b/tests/stability/test_spill.py index cc1ee9d224..302909ae49 100644 --- a/tests/stability/test_spill.py +++ b/tests/stability/test_spill.py @@ -8,10 +8,11 @@ @pytest.mark.stability -def test_spilling(): +@pytest.mark.parametrize("keep_around", (True, False)) +def test_spilling(keep_around): with dask.config.set({"distributed.scheduler.allowed-failures": 0}): with Cluster( - name=f"test_spilling-{uuid.uuid4().hex}", + name=f"test_spill-{uuid.uuid4().hex}", n_workers=5, worker_disk_size=55, wait_for_workers=True, @@ -20,5 +21,6 @@ def test_spilling(): arr = da.random.random((200, 2**27)).persist() # 200 GiB wait(arr) fut = client.compute(arr.sum()) - del arr + if not keep_around: + del arr wait(fut) From 78eca9c8ac566156fd328c2c2b585409605d3f16 Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Mon, 15 Aug 2022 15:56:04 +0200 Subject: [PATCH 07/14] Add scaled tensordot_stress --- tests/stability/test_spill.py | 51 +++++++++++++++++++++++++---------- 1 file changed, 37 insertions(+), 14 deletions(-) diff --git a/tests/stability/test_spill.py b/tests/stability/test_spill.py index 302909ae49..5ef2a8aeb9 100644 --- a/tests/stability/test_spill.py +++ b/tests/stability/test_spill.py @@ -10,17 +10,40 @@ @pytest.mark.stability @pytest.mark.parametrize("keep_around", (True, False)) def test_spilling(keep_around): - with dask.config.set({"distributed.scheduler.allowed-failures": 0}): - with Cluster( - name=f"test_spill-{uuid.uuid4().hex}", - n_workers=5, - worker_disk_size=55, - wait_for_workers=True, - ) as cluster: - with Client(cluster) as client: - arr = da.random.random((200, 2**27)).persist() # 200 GiB - wait(arr) - fut = client.compute(arr.sum()) - if not keep_around: - del arr - wait(fut) + with Cluster( + name=f"test_spill-{uuid.uuid4().hex}", + n_workers=5, + worker_disk_size=55, + worker_vm_types="t3-medium", + wait_for_workers=True, + environ={"DASK_DISTRIBUTED__SCHEDULER__ALLOWED_FAILURES": 0}, + ) as cluster: + with Client(cluster) as client: + arr = da.random.random((200, 2**27)).persist() # 200 GiB + wait(arr) + fut = client.compute(arr.sum()) + if not keep_around: + del arr + wait(fut) + + +@pytest.mark.stability +def test_tensordot_stress(): + with Cluster( + name=f"test_spill-{uuid.uuid4().hex}", + n_workers=5, + worker_disk_size=45, + worker_vm_types="t3-medium", + wait_for_workers=True, + environ={ + "DASK_DISTRIBUTED__SCHEDULER__ALLOWED_FAILURES": 0, + "DASK_DISTRIBUTED__WORKER__CONNECTIONS__INCOMING": 1, + "DASK_DISTRIBUTED__WORKER__CONNECTIONS__OUTGOING": 1, + }, + ) as cluster: + with Client(cluster) as client: + a = da.random.random((48 * 1024, 48 * 1024)) # 18 GiB + b = (a @ a.T).sum().round(3) + fut = client.compute(b) + wait(fut) + assert fut.result() From 7122d5317009253dda9b7713a2a3b00a68046c19 Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Tue, 16 Aug 2022 12:47:36 +0200 Subject: [PATCH 08/14] Fix env vars --- tests/stability/test_spill.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/tests/stability/test_spill.py b/tests/stability/test_spill.py index 5ef2a8aeb9..787c1b6b8c 100644 --- a/tests/stability/test_spill.py +++ b/tests/stability/test_spill.py @@ -14,9 +14,9 @@ def test_spilling(keep_around): name=f"test_spill-{uuid.uuid4().hex}", n_workers=5, worker_disk_size=55, - worker_vm_types="t3-medium", + worker_vm_types="t3.medium", wait_for_workers=True, - environ={"DASK_DISTRIBUTED__SCHEDULER__ALLOWED_FAILURES": 0}, + environ={"DASK_DISTRIBUTED__SCHEDULER__ALLOWED_FAILURES": "0"}, ) as cluster: with Client(cluster) as client: arr = da.random.random((200, 2**27)).persist() # 200 GiB @@ -33,12 +33,12 @@ def test_tensordot_stress(): name=f"test_spill-{uuid.uuid4().hex}", n_workers=5, worker_disk_size=45, - worker_vm_types="t3-medium", + worker_vm_types="t3.medium", wait_for_workers=True, environ={ - "DASK_DISTRIBUTED__SCHEDULER__ALLOWED_FAILURES": 0, - "DASK_DISTRIBUTED__WORKER__CONNECTIONS__INCOMING": 1, - "DASK_DISTRIBUTED__WORKER__CONNECTIONS__OUTGOING": 1, + "DASK_DISTRIBUTED__SCHEDULER__ALLOWED_FAILURES": "0", + "DASK_DISTRIBUTED__WORKER__CONNECTIONS__INCOMING": "1", + "DASK_DISTRIBUTED__WORKER__CONNECTIONS__OUTGOING": "1", }, ) as cluster: with Client(cluster) as client: From 0c1ccf5559bd86ff989eada26cf0a6e495770870 Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Tue, 16 Aug 2022 12:52:42 +0200 Subject: [PATCH 09/14] flake8 --- tests/stability/test_spill.py | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/stability/test_spill.py b/tests/stability/test_spill.py index 787c1b6b8c..a8ba3dd022 100644 --- a/tests/stability/test_spill.py +++ b/tests/stability/test_spill.py @@ -1,6 +1,5 @@ import uuid -import dask import dask.array as da import pytest from coiled.v2 import Cluster From 9723d63c2961760bea840b28aeb89e127f462746 Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Tue, 16 Aug 2022 14:24:41 +0200 Subject: [PATCH 10/14] Minor --- tests/stability/test_spill.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/stability/test_spill.py b/tests/stability/test_spill.py index a8ba3dd022..d0d0235e13 100644 --- a/tests/stability/test_spill.py +++ b/tests/stability/test_spill.py @@ -13,7 +13,7 @@ def test_spilling(keep_around): name=f"test_spill-{uuid.uuid4().hex}", n_workers=5, worker_disk_size=55, - worker_vm_types="t3.medium", + worker_vm_types=["t3.medium"], wait_for_workers=True, environ={"DASK_DISTRIBUTED__SCHEDULER__ALLOWED_FAILURES": "0"}, ) as cluster: @@ -32,7 +32,7 @@ def test_tensordot_stress(): name=f"test_spill-{uuid.uuid4().hex}", n_workers=5, worker_disk_size=45, - worker_vm_types="t3.medium", + worker_vm_types=["t3.medium"], wait_for_workers=True, environ={ "DASK_DISTRIBUTED__SCHEDULER__ALLOWED_FAILURES": "0", From 1b19dc94401007edfc244d1f6b9a2a520bbb8b48 Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Tue, 16 Aug 2022 15:58:56 +0200 Subject: [PATCH 11/14] Add comments --- tests/stability/test_spill.py | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/tests/stability/test_spill.py b/tests/stability/test_spill.py index d0d0235e13..356845becd 100644 --- a/tests/stability/test_spill.py +++ b/tests/stability/test_spill.py @@ -15,7 +15,12 @@ def test_spilling(keep_around): worker_disk_size=55, worker_vm_types=["t3.medium"], wait_for_workers=True, - environ={"DASK_DISTRIBUTED__SCHEDULER__ALLOWED_FAILURES": "0"}, + environ={ + # Note: We set allowed-failures to ensure that no tasks are not retried + # upon ungraceful shutdown behavior during adaptive scaling + # but we receive a KilledWorker() instead. + "DASK_DISTRIBUTED__SCHEDULER__ALLOWED_FAILURES": "0" + }, ) as cluster: with Client(cluster) as client: arr = da.random.random((200, 2**27)).persist() # 200 GiB @@ -35,7 +40,13 @@ def test_tensordot_stress(): worker_vm_types=["t3.medium"], wait_for_workers=True, environ={ + # Note: We set allowed-failures to ensure that no tasks are not retried + # upon ungraceful shutdown behavior during adaptive scaling + # but we receive a KilledWorker() instead. "DASK_DISTRIBUTED__SCHEDULER__ALLOWED_FAILURES": "0", + # We need to limit the number of connections to avoid getting `oom-killed`. + # See https://github.com/coiled/coiled-runtime/pull/229#discussion_r946807049 + # for a longer discussion "DASK_DISTRIBUTED__WORKER__CONNECTIONS__INCOMING": "1", "DASK_DISTRIBUTED__WORKER__CONNECTIONS__OUTGOING": "1", }, From 5da327a8862e3adc0af163edc84c635f3e9095fd Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Wed, 17 Aug 2022 11:36:34 +0200 Subject: [PATCH 12/14] Increase disk size --- tests/stability/test_spill.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/stability/test_spill.py b/tests/stability/test_spill.py index 356845becd..a9a8f76e55 100644 --- a/tests/stability/test_spill.py +++ b/tests/stability/test_spill.py @@ -36,7 +36,7 @@ def test_tensordot_stress(): with Cluster( name=f"test_spill-{uuid.uuid4().hex}", n_workers=5, - worker_disk_size=45, + worker_disk_size=55, worker_vm_types=["t3.medium"], wait_for_workers=True, environ={ From 10de0fd2c01dbff272359e687032546d46c04cd8 Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Wed, 17 Aug 2022 15:21:08 +0200 Subject: [PATCH 13/14] Add test skipping --- tests/stability/test_spill.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/stability/test_spill.py b/tests/stability/test_spill.py index a9a8f76e55..bf903b218e 100644 --- a/tests/stability/test_spill.py +++ b/tests/stability/test_spill.py @@ -31,6 +31,7 @@ def test_spilling(keep_around): wait(fut) +@pytest.mark.skip(reason="Skip until https://github.com/coiled/feedback/issues/185 is resolved.") @pytest.mark.stability def test_tensordot_stress(): with Cluster( From 44545873e6cde2f1ff39e140048fdd3ecbd789cb Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Wed, 17 Aug 2022 15:25:01 +0200 Subject: [PATCH 14/14] black --- tests/stability/test_spill.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/tests/stability/test_spill.py b/tests/stability/test_spill.py index bf903b218e..dba00ea7b8 100644 --- a/tests/stability/test_spill.py +++ b/tests/stability/test_spill.py @@ -31,7 +31,9 @@ def test_spilling(keep_around): wait(fut) -@pytest.mark.skip(reason="Skip until https://github.com/coiled/feedback/issues/185 is resolved.") +@pytest.mark.skip( + reason="Skip until https://github.com/coiled/feedback/issues/185 is resolved." +) @pytest.mark.stability def test_tensordot_stress(): with Cluster(