diff --git a/python/dask_cudf/dask_cudf/expr/__init__.py b/python/dask_cudf/dask_cudf/expr/__init__.py index 826f514a674..a76b655ef42 100644 --- a/python/dask_cudf/dask_cudf/expr/__init__.py +++ b/python/dask_cudf/dask_cudf/expr/__init__.py @@ -8,6 +8,9 @@ # Register custom expressions and collections if QUERY_PLANNING_ON: + # Broadly avoid "p2p" and "disk" defaults for now + config.set({"dataframe.shuffle.method": "tasks"}) + try: import dask_cudf.expr._collection import dask_cudf.expr._expr diff --git a/python/dask_cudf/dask_cudf/tests/test_distributed.py b/python/dask_cudf/dask_cudf/tests/test_distributed.py index 39eadb45c91..07fdb25dff9 100644 --- a/python/dask_cudf/dask_cudf/tests/test_distributed.py +++ b/python/dask_cudf/dask_cudf/tests/test_distributed.py @@ -16,9 +16,9 @@ dask_cuda = pytest.importorskip("dask_cuda") -def more_than_two_gpus(): +def at_least_n_gpus(n): ngpus = len(numba.cuda.gpus) - return ngpus >= 2 + return ngpus >= n @pytest.mark.parametrize("delayed", [True, False]) @@ -54,7 +54,7 @@ def test_merge(): @pytest.mark.skipif( - not more_than_two_gpus(), reason="Machine does not have more than two GPUs" + not at_least_n_gpus(2), reason="Machine does not have two GPUs" ) def test_ucx_seriesgroupby(): pytest.importorskip("ucp") @@ -97,3 +97,22 @@ def test_p2p_shuffle(): ddf.compute().sort_values("x"), check_index=False, ) + + +@pytest.mark.skipif( + not at_least_n_gpus(3), + reason="Machine does not have three GPUs", +) +def test_unique(): + # Using `"p2p"` can produce dispatching problems + # TODO: Test "p2p" after dask > 2024.4.1 is required + # See: https://github.com/dask/dask/pull/11040 + with dask_cuda.LocalCUDACluster(n_workers=3) as cluster: + with Client(cluster): + df = cudf.DataFrame({"x": ["a", "b", "c", "a", "a"]}) + ddf = dask_cudf.from_cudf(df, npartitions=2) + dd.assert_eq( + df.x.unique(), + ddf.x.unique().compute(), + check_index=False, + )