Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make CI happy again #8560

Merged
merged 47 commits into from
Mar 8, 2024
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
Show all changes
47 commits
Select commit Hold shift + click to select a range
31366e1
Fix test_basic_merge use of hash_join (no longer public)
milesgranger Mar 7, 2024
f4153fc
await computation in shuffle/tests/test_merge.py::test_merge
milesgranger Mar 7, 2024
c7b39e7
xfail p2p shuffle RuntimeError in shuffle/tests/test_merge.py::test_m…
milesgranger Mar 7, 2024
f527f49
Check UserWarning in test_client.py::test_recreate_error_collection
milesgranger Mar 7, 2024
286903d
Check UserWarning in test_client.py::test_recreate_task_collection
milesgranger Mar 7, 2024
f71ea01
ignore UserWarning: dask-expr does not support DataFrameIOFunction pr…
milesgranger Mar 7, 2024
6fb111b
xfail test_steal.py::test_blocklist_shuffle_split blocked is empty
milesgranger Mar 7, 2024
e5f70f9
xfail test_scheduler.py::test_default_task_duration_splits - split_pr…
milesgranger Mar 7, 2024
5c712a0
test_merge_unknown_to_unknown: skip layer checks when dask-expr enabled
milesgranger Mar 7, 2024
6cc06a9
xfail test_dataframe_annotations when dask-expr enabled
milesgranger Mar 7, 2024
815422c
test_raise_on_complex_numbers - dask-expr branch
milesgranger Mar 7, 2024
9f25e6b
test_raise_on_sparse_data - dask-expr branch
milesgranger Mar 7, 2024
4daa85c
test_raise_on_non_string_column_name - dask-expr branch
milesgranger Mar 7, 2024
7ed678e
Don't use dd.shuffle.shuffle
hendrikmakait Mar 7, 2024
f6f5930
Don't test minimal version for dask-expr in P2P
hendrikmakait Mar 7, 2024
eb53213
No worker restrictions
hendrikmakait Mar 7, 2024
68d7edf
Adjust unpack prefix
hendrikmakait Mar 7, 2024
8c908a2
Restrictions are not implemented
hendrikmakait Mar 7, 2024
6025a64
Remove conditional skip
hendrikmakait Mar 7, 2024
967696c
Skip test_basic
hendrikmakait Mar 7, 2024
a1f5826
Fix test_metrics prefix name
hendrikmakait Mar 7, 2024
df07359
Remove xfail on UserWarnings
milesgranger Mar 7, 2024
d9dde2d
Avoid object to string conversion
hendrikmakait Mar 7, 2024
e748ed6
fix
hendrikmakait Mar 7, 2024
3c4e9e8
dask-contrib/dask-expr#951
hendrikmakait Mar 7, 2024
e4dfd00
Fix mindeps
hendrikmakait Mar 7, 2024
a64dfdb
xfail test_futures_of_sorted until dask-contrib/dask-expr#952
milesgranger Mar 7, 2024
302b8bb
Check UserWarning in test_highlevelgraph.py::test_dataframe_annotations
milesgranger Mar 7, 2024
a67ac1d
Check HashJoinP2P for test_basic_merge
milesgranger Mar 7, 2024
a165f16
Conditional await in test_merge.py::test_merge
milesgranger Mar 7, 2024
3f93b93
xfail test_dataframes
milesgranger Mar 8, 2024
3d6471f
xfail test_dataframe_set_index_sync - passes fine locally...
milesgranger Mar 8, 2024
cadfe01
Fix test_dataframes
hendrikmakait Mar 8, 2024
aff2a8c
test_recreate_error_collection on mindeps-pandas
crusaderky Mar 8, 2024
8dbb385
test_recreate_task_collection on mindeps-pandas
crusaderky Mar 8, 2024
32062fa
Review test_merge.py
crusaderky Mar 8, 2024
78a773f
Review test_futures_of_sorted
crusaderky Mar 8, 2024
3e9545d
Revert test_dataframe_set_index_sync
crusaderky Mar 8, 2024
9d90f90
Cosmetic
crusaderky Mar 8, 2024
8ea35f0
Use dask.bag for test_futures_of_sorted
crusaderky Mar 8, 2024
3240987
Revert test_broken_comm
crusaderky Mar 8, 2024
f5abd39
Review test_crashed_worker_after_shuffle
crusaderky Mar 8, 2024
9ab8ca6
test_crashed_worker_after_shuffle without Nanny
crusaderky Mar 8, 2024
e974509
Merge branch 'main' into milesgranger/fix-ci
hendrikmakait Mar 8, 2024
4e5c4d4
Revert changes about default durations
crusaderky Mar 8, 2024
a2bdc78
Merge remote-tracking branch 'milesgranger/milesgranger/fix-ci' into …
crusaderky Mar 8, 2024
1ccfad6
Revert nanny changes in test_crashed_worker_after_shuffle
crusaderky Mar 8, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions distributed/protocol/tests/test_highlevelgraph.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ def fn(k):
assert plugin.resource_matches == B.npartitions


@pytest.mark.xfail(dd._dask_expr_enabled(), reason="Annotation is WIP in dask-expr")
@gen_cluster(client=True)
async def test_dataframe_annotations(c, s, a, b):
milesgranger marked this conversation as resolved.
Show resolved Hide resolved
retries = 5
Expand Down
37 changes: 25 additions & 12 deletions distributed/shuffle/tests/test_graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,16 @@ def test_raise_on_complex_numbers(dtype):
df = dd.from_pandas(
pd.DataFrame({"x": pd.array(range(10), dtype=dtype)}), npartitions=5
)
with pytest.raises(
TypeError, match=f"p2p does not support data of type '{df.x.dtype}'"
), dask.config.set({"dataframe.shuffle.method": "p2p"}):
df.shuffle("x")

if dd._dask_expr_enabled():
np = pytest.importorskip("numpy")
with pytest.raises(np.exceptions.ComplexWarning):
hendrikmakait marked this conversation as resolved.
Show resolved Hide resolved
df.shuffle("x").optimize()
else:
with pytest.raises(
TypeError, match=f"p2p does not support data of type '{df.x.dtype}'"
), dask.config.set({"dataframe.shuffle.method": "p2p"}):
df.shuffle("x")


@pytest.mark.xfail(
Expand All @@ -65,18 +71,25 @@ def test_raise_on_sparse_data():
df = dd.from_pandas(
pd.DataFrame({"x": pd.array(range(10), dtype="Sparse[float64]")}), npartitions=5
)
with pytest.raises(
TypeError, match="p2p does not support sparse data"
), dask.config.set({"dataframe.shuffle.method": "p2p"}):
df.shuffle("x")
if dd._dask_expr_enabled():
df.shuffle("x").optimize()
hendrikmakait marked this conversation as resolved.
Show resolved Hide resolved
else:
with pytest.raises(
TypeError, match="p2p does not support sparse data"
), dask.config.set({"dataframe.shuffle.method": "p2p"}):
df.shuffle("x")


def test_raise_on_non_string_column_name():
df = dd.from_pandas(pd.DataFrame({"a": range(10), 1: range(10)}), npartitions=5)
with pytest.raises(
TypeError, match="p2p requires all column names to be str"
), dask.config.set({"dataframe.shuffle.method": "p2p"}):
df.shuffle("a")

if dd._dask_expr_enabled():
df.shuffle("a").optimize()
hendrikmakait marked this conversation as resolved.
Show resolved Hide resolved
else:
with pytest.raises(
TypeError, match="p2p requires all column names to be str"
), dask.config.set({"dataframe.shuffle.method": "p2p"}):
df.shuffle("a")


def test_does_not_raise_on_stringified_numeric_column_name():
Expand Down
24 changes: 10 additions & 14 deletions distributed/shuffle/tests/test_merge.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@

from distributed import Worker
from distributed.shuffle._core import ShuffleId, ShuffleSpec, id_from_key
from distributed.shuffle._merge import hash_join
from distributed.shuffle._worker_plugin import ShuffleRun, _ShuffleRunManager
from distributed.utils_test import gen_cluster

Expand All @@ -21,7 +20,6 @@
import dask
from dask.dataframe._compat import PANDAS_GE_200, tm
from dask.dataframe.utils import assert_eq
from dask.utils_test import hlg_layer_topological

try:
import pyarrow as pa
Expand Down Expand Up @@ -52,6 +50,9 @@ def list_eq(aa, bb):
dd._compat.assert_numpy_array_equal(av, bv)


@pytest.mark.xfail(
reason="P2P shuffling failed during transfer phase", raises=RuntimeError
)
@gen_cluster(client=True)
async def test_minimal_version(c, s, a, b):
no_pyarrow_ctx = (
Expand Down Expand Up @@ -81,30 +82,27 @@ async def test_basic_merge(c, s, a, b, how):
B = pd.DataFrame({"y": [1, 3, 4, 4, 5, 6], "z": [6, 5, 4, 3, 2, 1]})
b = dd.repartition(B, [0, 2, 5])

joined = hash_join(a, "y", b, "y", how)
joined = a.merge(b, left_on="y", right_on="y", how=how)
milesgranger marked this conversation as resolved.
Show resolved Hide resolved

assert not hlg_layer_topological(joined.dask, -1).is_materialized()
result = await c.compute(joined)
expected = pd.merge(A, B, how, "y")
list_eq(result, expected)

# Different columns and npartitions
joined = hash_join(a, "x", b, "z", "outer", npartitions=3)
assert not hlg_layer_topological(joined.dask, -1).is_materialized()
assert joined.npartitions == 3
joined = a.merge(b, left_on="x", right_on="z", how="outer")

result = await c.compute(joined)
expected = pd.merge(A, B, "outer", None, "x", "z")

list_eq(result, expected)

assert (
hash_join(a, "y", b, "y", "inner")._name
== hash_join(a, "y", b, "y", "inner")._name
a.merge(b, left_on="y", right_on="y", how="inner")._name
== a.merge(b, left_on="y", right_on="y", how="inner")._name
)
assert (
hash_join(a, "y", b, "y", "inner")._name
!= hash_join(a, "y", b, "y", "outer")._name
a.merge(b, left_on="y", right_on="y", how="inner")._name
!= a.merge(b, left_on="y", right_on="y", how="outer")._name
)


Expand Down Expand Up @@ -222,10 +220,8 @@ async def test_merge(c, s, a, b, how, disk):
await c.compute(dd.merge(A, b, how=how)),
pd.merge(A, B, how=how),
)
# Note: No await since A and B are both pandas dataframes and this doesn't
# actually submit anything
list_eq(
c.compute(dd.merge(A, B, how=how)),
milesgranger marked this conversation as resolved.
Show resolved Hide resolved
await c.compute(dd.merge(A, B, how=how)),
pd.merge(A, B, how=how),
)

Expand Down
2 changes: 1 addition & 1 deletion distributed/shuffle/tests/test_merge_column_and_index.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ async def test_merge_unknown_to_unknown(
# Merge unknown to unknown
with dask.config.set({"dataframe.shuffle.method": "p2p"}):
result_graph = ddf_left_unknown.merge(ddf_right_unknown, on=on, how=how)
if not any(
if not dd._dask_expr_enabled() and not any(
isinstance(layer, (HashJoinP2PLayer, P2PShuffleLayer))
for layer in result_graph.dask.layers.values()
):
Expand Down
12 changes: 10 additions & 2 deletions distributed/tests/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -4827,7 +4827,8 @@ def make_err(x):
raise ValueError
return x

df2 = df.a.map(make_err)
with pytest.warns(UserWarning, match="You did not provide metadata"):
df2 = df.a.map(make_err)
f = c.compute(df2)
error_f = await c._get_errored_future(f)
function, args, kwargs = await c._get_components_from_future(error_f)
Expand Down Expand Up @@ -4935,7 +4936,8 @@ async def test_recreate_task_collection(c, s, a, b):

df = dd.from_pandas(pd.DataFrame({"a": [0, 1, 2, 3, 4]}), chunksize=2)

df2 = df.a.map(lambda x: x + 1)
with pytest.warns(UserWarning, match="You did not provide metadata"):
df2 = df.a.map(lambda x: x + 1)
f = c.compute(df2)

function, args, kwargs = await c._get_components_from_future(f)
Expand Down Expand Up @@ -6442,6 +6444,9 @@ async def test_wait_for_workers(c, s, a, b):
assert "1 ms" in str(info.value)


@pytest.mark.xfail(
reason="https://github.com/dask-contrib/dask-expr/issues/945", raises=UserWarning
)
@pytest.mark.skipif(WINDOWS, reason="num_fds not supported on windows")
@pytest.mark.skipif(MACOS, reason="dask/distributed#8075")
@pytest.mark.parametrize(
Expand Down Expand Up @@ -6555,6 +6560,9 @@ async def test_config_inherited_by_subprocess():
assert await c.submit(dask.config.get, "foo") == 100


@pytest.mark.xfail(
reason="https://github.com/dask-contrib/dask-expr/issues/945", raises=UserWarning
)
@gen_cluster(client=True)
async def test_futures_of_sorted(c, s, a, b):
pytest.importorskip("dask.dataframe")
Expand Down
1 change: 1 addition & 0 deletions distributed/tests/test_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -2761,6 +2761,7 @@ async def test_get_task_duration(c, s, a, b):
assert len(s.unknown_durations["slowinc"]) == 1


@pytest.mark.xfail(reason="split_prefix expected len of 1", raises=AssertionError)
@gen_cluster(client=True)
async def test_default_task_duration_splits(c, s, a, b):
"""Ensure that the default task durations for shuffle split tasks are, by default,
Expand Down
1 change: 1 addition & 0 deletions distributed/tests/test_steal.py
Original file line number Diff line number Diff line change
Expand Up @@ -1011,6 +1011,7 @@ async def test_balance_with_longer_task(c, s, a, b):
assert z.key in b.data


@pytest.mark.xfail(reason="blocked is empty", raises=AssertionError)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Potential blocker.
Discussion on test_scheduler.py::test_default_task_duration_splits above.

@gen_cluster(client=True)
async def test_blocklist_shuffle_split(c, s, a, b):
pd = pytest.importorskip("pandas")
Expand Down
3 changes: 3 additions & 0 deletions distributed/tests/test_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -3446,6 +3446,9 @@ def get_data(self, comm, **kwargs):
return super().get_data(comm, **kwargs)


@pytest.mark.xfail(
reason="https://github.com/dask-contrib/dask-expr/issues/945", raises=UserWarning
)
milesgranger marked this conversation as resolved.
Show resolved Hide resolved
@pytest.mark.slow
@gen_cluster(client=True, Worker=BreakingWorker)
async def test_broken_comm(c, s, a, b):
Expand Down
2 changes: 2 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,8 @@ addopts = '''
-p no:legacypath'''
filterwarnings = [
"error",
# https://github.com/dask-contrib/dask-expr/issues/945
'''ignore:dask_expr does not support the DataFrameIOFunction''',
'''ignore:Please use `dok_matrix` from the `scipy\.sparse` namespace, the `scipy\.sparse\.dok` namespace is deprecated.:DeprecationWarning''',
'''ignore:elementwise comparison failed. this will raise an error in the future:DeprecationWarning''',
'''ignore:unclosed <socket\.socket.*:ResourceWarning''',
Expand Down
Loading