From fd15b6f6ae487a6dfb7ba0a8266e100715269a74 Mon Sep 17 00:00:00 2001 From: fjetter Date: Fri, 28 Oct 2022 17:36:16 +0200 Subject: [PATCH] Patch memory_limit --- distributed/shuffle/tests/test_multi_file.py | 18 ++++++++++++++---- 1 file changed, 14 insertions(+), 4 deletions(-) diff --git a/distributed/shuffle/tests/test_multi_file.py b/distributed/shuffle/tests/test_multi_file.py index b177e46a0f7..092caf9d26a 100644 --- a/distributed/shuffle/tests/test_multi_file.py +++ b/distributed/shuffle/tests/test_multi_file.py @@ -7,6 +7,8 @@ import pytest from tornado.ioloop import IOLoop +from dask.utils import parse_bytes + from distributed.shuffle._multi_file import MultiFile from distributed.utils_test import gen_test @@ -189,9 +191,15 @@ def gen_bytes(percentage: float) -> bytes: @pytest.mark.slow @gen_test() -async def test_memory_limit(tmp_path): +async def test_memory_limit(tmp_path, monkeypatch): # TODO: Memory limit concurrency is defined on interpreter level. Need to # test multiple instances + monkeypatch.setattr(MultiFile, "memory_limit", parse_bytes("10.0 MiB")) + import time + + def dump_slow(*args): + time.sleep(0.1) + dump(*args) big_payload = { "shard-1": [gen_bytes(2)] * 2, @@ -203,7 +211,7 @@ async def test_memory_limit(tmp_path): } async with MultiFile( directory=tmp_path, - dump=dump, + dump=dump_slow, load=load, loop=IOLoop.current(), ) as mf: @@ -230,13 +238,15 @@ async def test_memory_limit(tmp_path): await asyncio.wait_for(mf.put(small_payload), 0.05) -@pytest.mark.slow @gen_test() -async def test_memory_limit_blocked_exception(tmp_path): +async def test_memory_limit_blocked_exception(tmp_path, monkeypatch): # TODO: Memory limit concurrency is defined on interpreter level. Need to # test multiple instances + monkeypatch.setattr(MultiFile, "memory_limit", parse_bytes("10.0 MiB")) + import time def dump_only_bytes(data, f): + time.sleep(0.1) if not isinstance(data, bytes): raise TypeError("Wrong type") f.write(data)