Skip to content

Commit

Permalink
Patch memory_limit
Browse files Browse the repository at this point in the history
  • Loading branch information
fjetter committed Oct 28, 2022
1 parent 62c018b commit fd15b6f
Showing 1 changed file with 14 additions and 4 deletions.
18 changes: 14 additions & 4 deletions distributed/shuffle/tests/test_multi_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
import pytest
from tornado.ioloop import IOLoop

from dask.utils import parse_bytes

from distributed.shuffle._multi_file import MultiFile
from distributed.utils_test import gen_test

Expand Down Expand Up @@ -189,9 +191,15 @@ def gen_bytes(percentage: float) -> bytes:

@pytest.mark.slow
@gen_test()
async def test_memory_limit(tmp_path):
async def test_memory_limit(tmp_path, monkeypatch):
# TODO: Memory limit concurrency is defined on interpreter level. Need to
# test multiple instances
monkeypatch.setattr(MultiFile, "memory_limit", parse_bytes("10.0 MiB"))
import time

def dump_slow(*args):
time.sleep(0.1)
dump(*args)

big_payload = {
"shard-1": [gen_bytes(2)] * 2,
Expand All @@ -203,7 +211,7 @@ async def test_memory_limit(tmp_path):
}
async with MultiFile(
directory=tmp_path,
dump=dump,
dump=dump_slow,
load=load,
loop=IOLoop.current(),
) as mf:
Expand All @@ -230,13 +238,15 @@ async def test_memory_limit(tmp_path):
await asyncio.wait_for(mf.put(small_payload), 0.05)


@pytest.mark.slow
@gen_test()
async def test_memory_limit_blocked_exception(tmp_path):
async def test_memory_limit_blocked_exception(tmp_path, monkeypatch):
# TODO: Memory limit concurrency is defined on interpreter level. Need to
# test multiple instances
monkeypatch.setattr(MultiFile, "memory_limit", parse_bytes("10.0 MiB"))
import time

def dump_only_bytes(data, f):
time.sleep(0.1)
if not isinstance(data, bytes):
raise TypeError("Wrong type")
f.write(data)
Expand Down

0 comments on commit fd15b6f

Please sign in to comment.