Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ensure correct data type #2847

Merged
merged 6 commits into from
Oct 26, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 21 additions & 7 deletions python/cugraph/cugraph/dask/traversal/bfs.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,23 @@ def convert_to_cudf(cp_arrays):
return df


def _call_plc_bfs(sID, mg_graph_x, st_x, depth_limit=None, return_distances=True):
def _call_plc_bfs(
sID,
mg_graph_x,
st_x,
depth_limit=None,
direction_optimizing=False,
return_distances=True,
do_expensive_check=False,
):
return pylibcugraph_bfs(
ResourceHandle(Comms.get_handle(sID).getHandle()),
mg_graph_x,
cudf.Series(st_x, dtype="int32"),
False,
depth_limit if depth_limit is not None else 0,
return_distances,
True,
graph=mg_graph_x,
sources=st_x,
direction_optimizing=direction_optimizing,
depth_limit=depth_limit if depth_limit is not None else 0,
compute_predecessors=return_distances,
do_expensive_check=do_expensive_check,
)


Expand Down Expand Up @@ -156,6 +164,10 @@ def bfs(input_graph, start, depth_limit=None, return_distances=True, check_start
start = input_graph.lookup_internal_vertex_id(start, tmp_col_names)

data_start = get_distributed_data(start)
do_expensive_check = False
# FIXME: Why is 'direction_optimizing' not part of the python cugraph API
# and why is it set to 'False' by default
direction_optimizing = False

cupy_result = [
client.submit(
Expand All @@ -164,7 +176,9 @@ def bfs(input_graph, start, depth_limit=None, return_distances=True, check_start
input_graph._plc_graph[w],
st[0],
depth_limit,
direction_optimizing,
return_distances,
do_expensive_check,
workers=[w],
allow_other_workers=False,
)
Expand Down
39 changes: 33 additions & 6 deletions python/cugraph/cugraph/structure/number_map.py
Original file line number Diff line number Diff line change
Expand Up @@ -258,15 +258,18 @@ def indirection_map(self, ddf, src_col_names, dst_col_names):

# Set global index
tmp_ddf = tmp_ddf.assign(idx=1)
tmp_ddf["global_id"] = tmp_ddf.idx.cumsum() - 1
# ensure the original vertex and the 'global_id' columns are
# of the same type unless the original vertex type is 'string'
tmp_ddf["global_id"] = tmp_ddf.idx.cumsum().astype(self.id_type) - 1
tmp_ddf = tmp_ddf.drop(columns="idx")
tmp_ddf = tmp_ddf.persist()
self.ddf = tmp_ddf
return tmp_ddf

def __init__(self, id_type=np.int32, renumber_type=None):
def __init__(self, renumber_id_type=np.int32, unrenumbered_id_type=np.int32):
self.implementation = None
self.id_type = id_type
self.renumber_id_type = renumber_id_type
self.unrenumbered_id_type = unrenumbered_id_type
# The default src/dst column names in the resulting renumbered
# dataframe. These may be updated by the renumbering methods if the
# input dataframe uses the default names.
Expand Down Expand Up @@ -479,13 +482,29 @@ def renumber_and_segment(
legacy_renum_only=False,
):
renumbered = True

# For columns with mismatch dtypes, set the renumbered
# id_type to either 'int32' or 'int64'
if df.dtypes.nunique() > 1:
# can't determine the edgelist input type
unrenumbered_id_type = None
else:
unrenumbered_id_type = df.dtypes[0]

if np.int64 in list(df.dtypes):
renumber_id_type = np.int64
else:
# renumber the edgelist to 'int32'
renumber_id_type = np.int32

# FIXME: Drop the renumber_type 'experimental' once all the
# algos follow the C/Pylibcugraph path

# The renumber_type 'legacy' runs both the python and the
# C++ renumbering.
if isinstance(src_col_names, list):
renumber_type = "legacy"

elif not (
df[src_col_names].dtype == np.int32 or df[src_col_names].dtype == np.int64
):
Expand All @@ -500,7 +519,7 @@ def renumber_and_segment(
renumber_type = "skip_renumbering"
renumbered = False

renumber_map = NumberMap()
renumber_map = NumberMap(renumber_id_type, unrenumbered_id_type)
if not isinstance(src_col_names, list):
src_col_names = [src_col_names]
dst_col_names = [dst_col_names]
Expand All @@ -512,11 +531,19 @@ def renumber_and_segment(

if isinstance(df, cudf.DataFrame):
renumber_map.implementation = NumberMap.SingleGPU(
df, src_col_names, dst_col_names, renumber_map.id_type, store_transposed
df,
src_col_names,
dst_col_names,
renumber_map.renumber_id_type,
store_transposed,
)
elif isinstance(df, dask_cudf.DataFrame):
renumber_map.implementation = NumberMap.MultiGPU(
df, src_col_names, dst_col_names, renumber_map.id_type, store_transposed
df,
src_col_names,
dst_col_names,
renumber_map.renumber_id_type,
store_transposed,
)
else:
raise TypeError("df must be cudf.DataFrame or dask_cudf.DataFrame")
Expand Down
25 changes: 23 additions & 2 deletions python/cugraph/cugraph/tests/mg/test_mg_renumber.py
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,7 @@ def test_pagerank_string_vertex_ids(dask_client):
G.from_cudf_edgelist(df, source="src", destination="dst")

sg_results = cugraph.pagerank(G)
sg_results = sg_results.sort_values("pagerank").reset_index(drop=True)
sg_results = sg_results.sort_values("vertex").reset_index(drop=True)

# MG
ddf = dask_cudf.from_cudf(df, npartitions=2)
Expand All @@ -318,7 +318,28 @@ def test_pagerank_string_vertex_ids(dask_client):
mg_results = dcg.pagerank(G_dask)
# Organize results for easy comparison, this does not change the values. MG
# Pagerank defaults to float64, so convert to float32 when comparing to SG
mg_results = mg_results.compute().sort_values("pagerank").reset_index(drop=True)
mg_results = mg_results.compute().sort_values("vertex").reset_index(drop=True)
mg_results["pagerank"] = mg_results["pagerank"].astype("float32")

assert_frame_equal(sg_results, mg_results)


@pytest.mark.parametrize("dtype", ["int32", "int64"])
def test_mg_renumber_multi_column(dtype, dask_client):
df = cudf.DataFrame(
{"src_a": [i for i in range(0, 10)], "dst_a": [i for i in range(10, 20)]}
).astype(dtype)

df["src_b"] = df["src_a"] + 10
df["dst_b"] = df["dst_a"] + 20
src_col = ["src_a", "src_b"]
dst_col = ["dst_a", "dst_b"]

ddf = dask_cudf.from_cudf(df, npartitions=2)
edgelist_type = list(ddf.dtypes)
G = cugraph.Graph()
G.from_dask_cudf_edgelist(ddf, source=src_col, destination=dst_col)
renumbered_ddf = G.edgelist.edgelist_df
renumbered_edgelist_type = list(renumbered_ddf.dtypes)

assert set(renumbered_edgelist_type).issubset(set(edgelist_type))