Skip to content

Commit

Permalink
added test_unpack_remotedata
Browse files Browse the repository at this point in the history
  • Loading branch information
madsbk committed Nov 22, 2022
1 parent 1feab21 commit dd7479a
Showing 1 changed file with 43 additions and 1 deletion.
44 changes: 43 additions & 1 deletion distributed/tests/test_utils_comm.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,17 @@

import pytest

from dask.optimization import SubgraphCallable

from distributed.core import ConnectionPool
from distributed.utils_comm import gather_from_workers, pack_data, retry, subs_multiple
from distributed.utils_comm import (
WrappedKey,
gather_from_workers,
pack_data,
retry,
subs_multiple,
unpack_remotedata,
)
from distributed.utils_test import BrokenComm, gen_cluster


Expand Down Expand Up @@ -129,3 +138,36 @@ async def f():

assert n_calls == 6
assert sleep_calls == [0.0, 1.0, 3.0, 6.0, 6.0]


def test_unpack_remotedata():
def assert_eq(keys1: set[WrappedKey], keys2: set[WrappedKey]) -> None:
if len(keys1) != len(keys2):
assert False
if not keys1:
assert True
if not all(isinstance(k, WrappedKey) for k in keys1 & keys2):
assert False
assert sorted([k.key for k in keys1]) == sorted([k.key for k in keys2])

assert unpack_remotedata(1) == (1, set())
assert unpack_remotedata(()) == ((), set())

res, keys = unpack_remotedata(WrappedKey("mykey"))
assert res == "mykey"
assert_eq(keys, {WrappedKey("mykey")})

# Check unpack of SC that contains a wrapped key
sc = SubgraphCallable({"key": (WrappedKey("data"),)}, outkey="key", inkeys=["arg1"])
dsk = (sc, "arg1")
res, keys = unpack_remotedata(dsk)
assert res[0] != sc # Notice, the first item (the SC) has been changed
assert res[1:] == ("arg1", "data")
assert_eq(keys, {WrappedKey("data")})

# Check unpack of SC when it takes a wrapped key as argument
sc = SubgraphCallable({"key": ("arg1",)}, outkey="key", inkeys=[WrappedKey("arg1")])
dsk = (sc, "arg1")
res, keys = unpack_remotedata(dsk)
assert res == (sc, "arg1") # Notice, the first item (the SC) has NOT been changed
assert_eq(keys, set())

0 comments on commit dd7479a

Please sign in to comment.