-
Notifications
You must be signed in to change notification settings - Fork 17
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Integration tests for spilling #229
Merged
Merged
Changes from 10 commits
Commits
Show all changes
14 commits
Select commit
Hold shift + click to select a range
f6a77be
Spill/unspill 10x memory size
hendrikmakait 47a77cb
black
hendrikmakait e97c21d
isort and imports
hendrikmakait e7d3e18
Adjust tests to return measurements if run standalone
hendrikmakait 57e8fe8
Increase available disk
hendrikmakait 6cbec3e
Run test for version where data should be kept and where data should …
hendrikmakait 78eca9c
Add scaled tensordot_stress
hendrikmakait 7122d53
Fix env vars
hendrikmakait 0c1ccf5
flake8
hendrikmakait 9723d63
Minor
hendrikmakait 1b19dc9
Add comments
hendrikmakait 5da327a
Increase disk size
hendrikmakait 10de0fd
Add test skipping
hendrikmakait 4454587
black
hendrikmakait File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,48 @@ | ||
import uuid | ||
|
||
import dask.array as da | ||
import pytest | ||
from coiled.v2 import Cluster | ||
from dask.distributed import Client, wait | ||
|
||
|
||
@pytest.mark.stability | ||
@pytest.mark.parametrize("keep_around", (True, False)) | ||
def test_spilling(keep_around): | ||
with Cluster( | ||
name=f"test_spill-{uuid.uuid4().hex}", | ||
n_workers=5, | ||
worker_disk_size=55, | ||
worker_vm_types=["t3.medium"], | ||
wait_for_workers=True, | ||
environ={"DASK_DISTRIBUTED__SCHEDULER__ALLOWED_FAILURES": "0"}, | ||
) as cluster: | ||
with Client(cluster) as client: | ||
arr = da.random.random((200, 2**27)).persist() # 200 GiB | ||
wait(arr) | ||
fut = client.compute(arr.sum()) | ||
if not keep_around: | ||
del arr | ||
wait(fut) | ||
|
||
|
||
@pytest.mark.stability | ||
def test_tensordot_stress(): | ||
with Cluster( | ||
name=f"test_spill-{uuid.uuid4().hex}", | ||
n_workers=5, | ||
worker_disk_size=45, | ||
worker_vm_types=["t3.medium"], | ||
wait_for_workers=True, | ||
environ={ | ||
"DASK_DISTRIBUTED__SCHEDULER__ALLOWED_FAILURES": "0", | ||
"DASK_DISTRIBUTED__WORKER__CONNECTIONS__INCOMING": "1", | ||
"DASK_DISTRIBUTED__WORKER__CONNECTIONS__OUTGOING": "1", | ||
}, | ||
) as cluster: | ||
with Client(cluster) as client: | ||
a = da.random.random((48 * 1024, 48 * 1024)) # 18 GiB | ||
b = (a @ a.T).sum().round(3) | ||
fut = client.compute(b) | ||
wait(fut) | ||
assert fut.result() |
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to reduce the number of incoming and outgoing connections to avoid getting the worker
oom-killed
. With a default chunk size of 100 MiB from thedask.array
, having 4 incoming and 4 outgoing connections at the same time means that we might use up to 800 MiB for communications. With the (previous) default of at3.medium
machine, this appears to be ~25 % of the actual memory that we can use. Together with the misconfiguredWorker.memory_limit
(coiled/feedback#185), we reliably trigger theoom-killer
when running the workload without the adjusted config. See dask/distributed#6208 for an existing upstream issue.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@hendrikmakait is this safe to remove now that coiled/feedback#185 is resolved?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, there is an ongoing effort in dask/distributed#6208 to resolve the underlying issue. coiled/feedback#185 made this even worse.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks. I think we may need to skip this test for a while until this is a bit more stable (cf #280 )