Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avoid "p2p" shuffle as a default when dask_cudf is imported #15469

Merged
merged 9 commits into from
Apr 8, 2024
3 changes: 3 additions & 0 deletions python/dask_cudf/dask_cudf/expr/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@

# Register custom expressions and collections
if QUERY_PLANNING_ON:
# Broadly avoid "p2p" and "disk" defaults for now
config.set({"dataframe.shuffle.method": "tasks"})

try:
import dask_cudf.expr._collection
import dask_cudf.expr._expr
Expand Down
22 changes: 19 additions & 3 deletions python/dask_cudf/dask_cudf/tests/test_distributed.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@
dask_cuda = pytest.importorskip("dask_cuda")


def more_than_two_gpus():
def more_than_n_gpus(n):
ngpus = len(numba.cuda.gpus)
return ngpus >= 2
return ngpus >= n


@pytest.mark.parametrize("delayed", [True, False])
Expand Down Expand Up @@ -54,7 +54,7 @@ def test_merge():


@pytest.mark.skipif(
not more_than_two_gpus(), reason="Machine does not have more than two GPUs"
not more_than_n_gpus(2), reason="Machine does not have more than two GPUs"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
not more_than_n_gpus(2), reason="Machine does not have more than two GPUs"
not more_than_n_gpus(2), reason="Machine does not at least two GPUs"

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just like the message was misleading, I believe the function name is too and I would suggest renaming it to at_least_n_gpus or something more accurate.

)
def test_ucx_seriesgroupby():
pytest.importorskip("ucp")
Expand Down Expand Up @@ -97,3 +97,19 @@ def test_p2p_shuffle():
ddf.compute().sort_values("x"),
check_index=False,
)


@pytest.mark.skipif(
not more_than_n_gpus(3),
reason="Machine does not have more than three GPUs",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
reason="Machine does not have more than three GPUs",
reason="Machine does not have at least three GPUs",

)
def test_unique():
with dask_cuda.LocalCUDACluster(n_workers=3) as cluster:
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Needed >2 workers to reproduce the error locally. Not sure what the problem is yet, but the to-pyarrow dispatch is failing to register on one or more workers.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So that means the error is not reproducible by disabling P2P shuffle and this test confirms that, right?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This PR does something pretty simple, but the background is slightly confusing:

  • Using the latest release of dask, this test would fail without the global "tasks" config that is set in this PR
  • After Add lazy "cudf" registration for p2p-related dispatch functions dask/dask#11040, this test will also pass when the "p2p" shuffle is used
  • Even though "p2p" works for dask:main, I still think it makes sense to use the "tasks" default (at least for now). Although "p2p" should theoretically be more stable, I have not found this to be the case in practice for GPUs. Also, "tasks" is definitely faster.

with Client(cluster):
df = cudf.DataFrame({"x": ["a", "b", "c", "a", "a"]})
ddf = dask_cudf.from_cudf(df, npartitions=2)
dd.assert_eq(
df.x.unique(),
ddf.x.unique().compute(),
check_index=False,
)
Loading