Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Integration tests for spilling #229

Merged
merged 14 commits into from
Aug 18, 2022
62 changes: 62 additions & 0 deletions tests/stability/test_spill.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
import uuid

import dask.array as da
import pytest
from coiled.v2 import Cluster
from dask.distributed import Client, wait


@pytest.mark.stability
@pytest.mark.parametrize("keep_around", (True, False))
def test_spilling(keep_around):
with Cluster(
name=f"test_spill-{uuid.uuid4().hex}",
n_workers=5,
worker_disk_size=55,
worker_vm_types=["t3.medium"],
wait_for_workers=True,
environ={
# Note: We set allowed-failures to ensure that no tasks are not retried
# upon ungraceful shutdown behavior during adaptive scaling
# but we receive a KilledWorker() instead.
"DASK_DISTRIBUTED__SCHEDULER__ALLOWED_FAILURES": "0"
},
) as cluster:
with Client(cluster) as client:
arr = da.random.random((200, 2**27)).persist() # 200 GiB
wait(arr)
fut = client.compute(arr.sum())
if not keep_around:
del arr
wait(fut)


@pytest.mark.skip(
reason="Skip until https://github.com/coiled/feedback/issues/185 is resolved."
)
@pytest.mark.stability
def test_tensordot_stress():
with Cluster(
name=f"test_spill-{uuid.uuid4().hex}",
n_workers=5,
worker_disk_size=55,
worker_vm_types=["t3.medium"],
wait_for_workers=True,
environ={
# Note: We set allowed-failures to ensure that no tasks are not retried
# upon ungraceful shutdown behavior during adaptive scaling
# but we receive a KilledWorker() instead.
"DASK_DISTRIBUTED__SCHEDULER__ALLOWED_FAILURES": "0",
# We need to limit the number of connections to avoid getting `oom-killed`.
# See https://github.com/coiled/coiled-runtime/pull/229#discussion_r946807049
# for a longer discussion
"DASK_DISTRIBUTED__WORKER__CONNECTIONS__INCOMING": "1",
"DASK_DISTRIBUTED__WORKER__CONNECTIONS__OUTGOING": "1",
Comment on lines +53 to +54
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to reduce the number of incoming and outgoing connections to avoid getting the worker oom-killed. With a default chunk size of 100 MiB from the dask.array, having 4 incoming and 4 outgoing connections at the same time means that we might use up to 800 MiB for communications. With the (previous) default of a t3.medium machine, this appears to be ~25 % of the actual memory that we can use. Together with the misconfigured Worker.memory_limit (coiled/feedback#185), we reliably trigger the oom-killer when running the workload without the adjusted config. See dask/distributed#6208 for an existing upstream issue.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hendrikmakait is this safe to remove now that coiled/feedback#185 is resolved?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, there is an ongoing effort in dask/distributed#6208 to resolve the underlying issue. coiled/feedback#185 made this even worse.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. I think we may need to skip this test for a while until this is a bit more stable (cf #280 )

},
) as cluster:
with Client(cluster) as client:
a = da.random.random((48 * 1024, 48 * 1024)) # 18 GiB
b = (a @ a.T).sum().round(3)
fut = client.compute(b)
wait(fut)
assert fut.result()