Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG] Critical: Force cudf.concat when passing in a cudf Series to MG Uniform Neighbor Sample #3416

Merged
merged 12 commits into from
Apr 5, 2023
38 changes: 28 additions & 10 deletions python/cugraph/cugraph/dask/sampling/uniform_neighbor_sample.py
Original file line number Diff line number Diff line change
Expand Up @@ -396,23 +396,41 @@ def uniform_neighbor_sample(
else:
indices_t = numpy.int32

if input_graph.renumbered:
start_list = input_graph.lookup_internal_vertex_id(start_list)

start_list = start_list.rename(start_col_name).to_frame()
start_list = start_list.rename(start_col_name)
if batch_id_list is not None:
ddf = start_list.join(batch_id_list.rename(batch_col_name))
batch_id_list = batch_id_list.rename(batch_col_name)
if hasattr(start_list, "compute"):
rlratzel marked this conversation as resolved.
Show resolved Hide resolved
# mg input
start_list = start_list.to_frame()
batch_id_list = batch_id_list.to_frame()
ddf = start_list.merge(
batch_id_list,
how="left",
left_index=True,
right_index=True,
)
else:
# sg input
ddf = cudf.concat(
[
start_list,
batch_id_list,
],
axis=1,
)
else:
ddf = start_list
ddf = start_list.to_frame()
Comment on lines +405 to +423
Copy link
Member

@VibhuJawa VibhuJawa Apr 4, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we really care about the index here ? I think not . Does below work ?

start_list = start_list.reset_index(drop=True)
batch_id_list = batch_id_list.reset_index(drop=True)

if isinstance(start_list, dask_cudf.Series):
   ddf = dd.concat([start_list, batch_id_list], ignore_unknown_divisions=True, axis=1) 
else:
   ddf = cudf.concat([start_list, batch_id_list], axis =1, ignore_index=True) 

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we reset index can we join batch id and start list correctly?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And also, I ran into an issue with dask_cudf.concat where the name of the series was dropped in one of my first attempts at a solution. dask_cudf.merge doesn't have that problem.

Copy link
Member

@VibhuJawa VibhuJawa Apr 4, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should be able to, from the logic you shared , we are merging on index ( left_index=True, right_index=True) in dask which is the same thing but more inefficient.

Edit: Also added ingore_index=True to make it more concrete in cuDF.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, let me try this

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@VibhuJawa I just confirmed this is not an issue with dask-cudf, it's an issue with our get_distributed_data function. I will make an issue for cugraph instead.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure why calling merge instead of concat before get_distributed_data works, but for some reason the bug completely disappears with merge.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can take a look too

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for creating an issue .

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I should link it here, sorry: #3420


if isinstance(ddf, cudf.DataFrame):
splits = cp.array_split(cp.arange(len(ddf)), len(Comms.get_workers()))
ddf = {w: [ddf.iloc[splits[i]]] for i, w in enumerate(Comms.get_workers())}
if input_graph.renumbered:
ddf = input_graph.lookup_internal_vertex_id(ddf, column_name=start_col_name)

else:
if hasattr(ddf, "compute"):
ddf = get_distributed_data(ddf)
wait(ddf)
ddf = ddf.worker_to_parts
else:
splits = cp.array_split(cp.arange(len(ddf)), len(Comms.get_workers()))
ddf = {w: [ddf.iloc[splits[i]]] for i, w in enumerate(Comms.get_workers())}

client = get_client()
session_id = Comms.get_session_id()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import os

import pytest
import cupy
import cudf
import dask_cudf
from pylibcugraph.testing.utils import gen_fixture_params_product
Expand Down Expand Up @@ -422,7 +423,7 @@ def test_uniform_neighbor_sample_edge_properties(dask_client, return_offsets):


@pytest.mark.mg
def test_uniform_neighbor_sample_edge_properties_self_loops():
def test_uniform_neighbor_sample_edge_properties_self_loops(dask_client):
df = dask_cudf.from_cudf(
cudf.DataFrame(
{
Expand Down Expand Up @@ -484,7 +485,9 @@ def test_uniform_neighbor_sample_edge_properties_self_loops():
@pytest.mark.skipif(
int(os.getenv("DASK_NUM_WORKERS", 2)) < 2, reason="too few workers to test"
)
def test_uniform_neighbor_edge_properties_sample_small_start_list(with_replacement):
def test_uniform_neighbor_edge_properties_sample_small_start_list(
dask_client, with_replacement
):
df = dask_cudf.from_cudf(
cudf.DataFrame(
{
Expand Down Expand Up @@ -518,7 +521,7 @@ def test_uniform_neighbor_edge_properties_sample_small_start_list(with_replaceme


@pytest.mark.mg
def test_uniform_neighbor_sample_without_dask_inputs():
def test_uniform_neighbor_sample_without_dask_inputs(dask_client):
df = dask_cudf.from_cudf(
cudf.DataFrame(
{
Expand Down Expand Up @@ -573,6 +576,65 @@ def test_uniform_neighbor_sample_without_dask_inputs():
assert sorted(sampling_results.hop_id.values_host.tolist()) == [0, 0, 0, 1, 1, 1]


@pytest.mark.mg
@pytest.mark.parametrize("dataset", datasets)
@pytest.mark.parametrize("input_df", [cudf.DataFrame, dask_cudf.DataFrame])
@pytest.mark.parametrize("max_batches", [2, 8, 16, 32])
def test_uniform_neighbor_sample_batched(dask_client, dataset, input_df, max_batches):
num_workers = len(dask_client.scheduler_info()["workers"])

df = dataset.get_edgelist()
df["eid"] = cupy.arange(len(df), dtype=df["src"].dtype)
df["etp"] = cupy.zeros_like(df["eid"].to_cupy())
ddf = dask_cudf.from_cudf(df, npartitions=num_workers)

G = cugraph.Graph(directed=True)
G.from_dask_cudf_edgelist(
ddf,
source="src",
destination="dst",
edge_attr=["wgt", "eid", "etp"],
legacy_renum_only=True,
)

input_vertices = dask_cudf.concat([df.src, df.dst]).unique().compute()
assert isinstance(input_vertices, cudf.Series)

input_vertices.index = cupy.random.permutation(len(input_vertices))

input_batch = cudf.Series(
cupy.random.randint(0, max_batches, len(input_vertices)), dtype="int32"
)
input_batch.index = cupy.random.permutation(len(input_vertices))

if input_df == dask_cudf.DataFrame:
input_batch = dask_cudf.from_cudf(input_batch, npartitions=num_workers)
input_vertices = dask_cudf.from_cudf(input_vertices, npartitions=num_workers)

sampling_results = cugraph.dask.uniform_neighbor_sample(
G,
start_list=input_vertices,
batch_id_list=input_batch,
fanout_vals=[5, 5],
with_replacement=False,
with_edge_properties=True,
)

for batch_id in range(max_batches):
output_starts_per_batch = (
sampling_results[
(sampling_results.batch_id == batch_id) & (sampling_results.hop_id == 0)
]
.sources.nunique()
.compute()
)

input_starts_per_batch = len(input_batch[input_batch == batch_id])

# Should be <= to account for starts without outgoing edges
assert output_starts_per_batch <= input_starts_per_batch


# =============================================================================
# Benchmarks
# =============================================================================
Expand All @@ -581,7 +643,7 @@ def test_uniform_neighbor_sample_without_dask_inputs():
@pytest.mark.mg
@pytest.mark.slow
@pytest.mark.parametrize("n_samples", [1_000, 5_000, 10_000])
def bench_uniform_neigbour_sample_email_eu_core(gpubenchmark, dask_client, n_samples):
def bench_uniform_neighbor_sample_email_eu_core(gpubenchmark, dask_client, n_samples):
input_data_path = email_Eu_core.get_path()
chunksize = dcg.get_chunksize(input_data_path)

Expand Down