diff --git a/python/cugraph/cugraph/gnn/__init__.py b/python/cugraph/cugraph/gnn/__init__.py index 9d468905a61..b2f8d3e9e83 100644 --- a/python/cugraph/cugraph/gnn/__init__.py +++ b/python/cugraph/cugraph/gnn/__init__.py @@ -12,4 +12,4 @@ # limitations under the License. from .graph_store import CuGraphStore -from .graph_store import CuFeatureStorage +from .dgl_extensions.feature_storage import CuFeatureStorage diff --git a/python/cugraph/cugraph/gnn/dgl_extensions/__init__.py b/python/cugraph/cugraph/gnn/dgl_extensions/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/python/cugraph/cugraph/gnn/dgl_extensions/feature_storage.py b/python/cugraph/cugraph/gnn/dgl_extensions/feature_storage.py new file mode 100644 index 00000000000..244dfa8b621 --- /dev/null +++ b/python/cugraph/cugraph/gnn/dgl_extensions/feature_storage.py @@ -0,0 +1,100 @@ +# Copyright (c) 2022, NVIDIA CORPORATION. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +import cudf +import dask_cudf +import cupy as cp +from cugraph.experimental import MGPropertyGraph + + +class CuFeatureStorage: + """ + Storage for node/edge feature data. + """ + + def __init__( + self, pg, columns, storage_type, backend_lib="torch", indices_offset=0 + ): + self.pg = pg + self.columns = columns + if backend_lib == "torch": + from torch.utils.dlpack import from_dlpack + elif backend_lib == "tf": + from tensorflow.experimental.dlpack import from_dlpack + elif backend_lib == "cupy": + from cupy import from_dlpack + else: + raise NotImplementedError( + f"Only PyTorch ('torch'), TensorFlow ('tf'), and CuPy ('cupy') " + f"backends are currently supported, got {backend_lib=}" + ) + if storage_type not in ["edge", "node"]: + raise NotImplementedError("Only edge and node storage is supported") + + self.storage_type = storage_type + + self.from_dlpack = from_dlpack + self.indices_offset = indices_offset + + def fetch(self, indices, device=None, pin_memory=False, **kwargs): + """Fetch the features of the given node/edge IDs to the + given device. + + Parameters + ---------- + indices : Tensor + Node or edge IDs. + device : Device + Device context. + pin_memory : + + Returns + ------- + Tensor + Feature data stored in PyTorch Tensor. + """ + # Default implementation uses synchronous fetch. + + indices = cp.asarray(indices) + if isinstance(self.pg, MGPropertyGraph): + # dask_cudf loc breaks if we provide cudf series/cupy array + # https://github.com/rapidsai/cudf/issues/11877 + indices = indices.get() + else: + indices = cudf.Series(indices) + + indices = indices + self.indices_offset + + if self.storage_type == "node": + subset_df = self.pg.get_vertex_data( + vertex_ids=indices, columns=self.columns + ) + else: + subset_df = self.pg.get_edge_data(edge_ids=indices, columns=self.columns) + + subset_df = subset_df[self.columns] + + if isinstance(subset_df, dask_cudf.DataFrame): + subset_df = subset_df.compute() + + if len(subset_df) == 0: + raise ValueError(f"indices = {indices} not found in FeatureStorage") + cap = subset_df.to_dlpack() + tensor = self.from_dlpack(cap) + del cap + if device: + if not isinstance(tensor, cp.ndarray): + # Cant transfer to different device for cupy + tensor = tensor.to(device) + return tensor diff --git a/python/cugraph/cugraph/gnn/dgl_extensions/utils/__init__.py b/python/cugraph/cugraph/gnn/dgl_extensions/utils/__init__.py new file mode 100644 index 00000000000..b04c7e4b5f5 --- /dev/null +++ b/python/cugraph/cugraph/gnn/dgl_extensions/utils/__init__.py @@ -0,0 +1,12 @@ +# Copyright (c) 2022, NVIDIA CORPORATION. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/python/cugraph/cugraph/gnn/dgl_extensions/utils/add_data.py b/python/cugraph/cugraph/gnn/dgl_extensions/utils/add_data.py new file mode 100644 index 00000000000..89614606dd3 --- /dev/null +++ b/python/cugraph/cugraph/gnn/dgl_extensions/utils/add_data.py @@ -0,0 +1,62 @@ +# Copyright (c) 2022, NVIDIA CORPORATION. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Utils for adding data to cugraph graphstore objects + + +def _update_feature_map( + pg_feature_map, feat_name_obj, contains_vector_features, columns +): + """ + Update the existing feature map `pg_feature_map` based on `feat_name_obj` + """ + if contains_vector_features: + if feat_name_obj is None: + raise ValueError( + "feature name must be provided when wrapping" + + " multiple columns under a single feature name" + + " or a feature map" + ) + + if isinstance(feat_name_obj, str): + pg_feature_map[feat_name_obj] = columns + + elif isinstance(feat_name_obj, dict): + covered_columns = [] + for col in feat_name_obj.keys(): + current_cols = feat_name_obj[col] + # Handle strings too + if isinstance(current_cols, str): + current_cols = [current_cols] + covered_columns = covered_columns + current_cols + + if set(covered_columns) != set(columns): + raise ValueError( + f"All the columns {columns} not covered in {covered_columns} " + f"Please check the feature_map {feat_name_obj} provided" + ) + + for key, cols in feat_name_obj.items(): + if isinstance(cols, str): + cols = [cols] + pg_feature_map[key] = cols + else: + raise ValueError(f"{feat_name_obj} should be str or dict") + else: + if feat_name_obj: + raise ValueError( + f"feat_name {feat_name_obj} is only valid when " + "wrapping multiple columns under feature names" + ) + for col in columns: + pg_feature_map[col] = [col] diff --git a/python/cugraph/cugraph/gnn/dgl_extensions/utils/sampling.py b/python/cugraph/cugraph/gnn/dgl_extensions/utils/sampling.py new file mode 100644 index 00000000000..0e12371271d --- /dev/null +++ b/python/cugraph/cugraph/gnn/dgl_extensions/utils/sampling.py @@ -0,0 +1,181 @@ +# Copyright (c) 2022, NVIDIA CORPORATION. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +# Utils for sampling on graphstore like objects +import cugraph +import cudf +import cupy as cp +import dask_cudf +from cugraph.experimental import PropertyGraph + +src_n = PropertyGraph.src_col_name +dst_n = PropertyGraph.dst_col_name +type_n = PropertyGraph.type_col_name +eid_n = PropertyGraph.edge_id_col_name +vid_n = PropertyGraph.vertex_col_name + + +def get_subgraph_and_src_range_from_edgelist(edge_list, is_mg, reverse_edges=False): + if reverse_edges: + edge_list = edge_list.rename(columns={src_n: dst_n, dst_n: src_n}) + + subgraph = cugraph.MultiGraph(directed=True) + if is_mg: + # FIXME: Can not switch to renumber = False + # For MNMG Algos + # Remove when https://github.com/rapidsai/cugraph/issues/2437 + # lands + create_subgraph_f = subgraph.from_dask_cudf_edgelist + renumber = True + edge_list = edge_list.persist() + src_range = edge_list[src_n].min().compute(), edge_list[src_n].max().compute() + + else: + # Note: We have to keep renumber = False + # to handle cases when the seed_nodes is not present in subgraph + create_subgraph_f = subgraph.from_cudf_edgelist + renumber = False + src_range = edge_list[src_n].min(), edge_list[src_n].max() + + create_subgraph_f( + edge_list, + source=src_n, + destination=dst_n, + edge_attr=eid_n, + renumber=renumber, + # FIXME: renumber=False is not supported for MNMG algos + legacy_renum_only=True, + ) + + return subgraph, src_range + + +def sample_multiple_sgs( + sgs, + sample_f, + start_list_d, + start_list_dtype, + edge_dir, + fanout, + with_replacement, +): + start_list_types = list(start_list_d.keys()) + output_dfs = [] + for can_etype, (sg, start_list_range) in sgs.items(): + can_etype = _convert_can_etype_s_to_tup(can_etype) + if _edge_types_contains_canonical_etype(can_etype, start_list_types, edge_dir): + if edge_dir == "in": + subset_type = can_etype[2] + else: + subset_type = can_etype[0] + output = sample_single_sg( + sg, + sample_f, + start_list_d[subset_type], + start_list_dtype, + start_list_range, + fanout, + with_replacement, + ) + output_dfs.append(output) + if len(output_dfs) == 0: + empty_df = cudf.DataFrame({"sources": [], "destinations": [], "indices": []}) + return empty_df.astype(cp.int32) + + if isinstance(output_dfs[0], dask_cudf.DataFrame): + return dask_cudf.concat(output_dfs, ignore_index=True) + else: + return cudf.concat(output_dfs, ignore_index=True) + + +def sample_single_sg( + sg, + sample_f, + start_list, + start_list_dtype, + start_list_range, + fanout, + with_replacement, +): + if isinstance(start_list, dict): + start_list = cudf.concat(list(start_list.values())) + + # Uniform sampling fails when the dtype + # of the seed dtype is not same as the node dtype + start_list = start_list.astype(start_list_dtype) + + # Filter start list by ranges + # to enure the seed is with in index values + # see below: + # https://github.com/rapidsai/cugraph/blob/branch-22.12/cpp/src/prims/per_v_random_select_transform_outgoing_e.cuh + start_list = start_list[ + (start_list >= start_list_range[0]) & (start_list <= start_list_range[1]) + ] + sampled_df = sample_f( + sg, + start_list=start_list, + fanout_vals=[fanout], + with_replacement=with_replacement, + ) + return sampled_df + + +def _edge_types_contains_canonical_etype(can_etype, edge_types, edge_dir): + src_type, _, dst_type = can_etype + if edge_dir == "in": + return dst_type in edge_types + else: + return src_type in edge_types + + +def _convert_can_etype_s_to_tup(canonical_etype_s): + src_type, etype, dst_type = canonical_etype_s.split(",") + src_type = src_type[2:-1] + dst_type = dst_type[2:-2] + etype = etype[2:-1] + return (src_type, etype, dst_type) + + +def create_dlpack_d(d): + dlpack_d = {} + for k, df in d.items(): + if len(df) == 0: + dlpack_d[k] = (None, None, None) + else: + dlpack_d[k] = ( + df[src_n].to_dlpack(), + df[dst_n].to_dlpack(), + df[eid_n].to_dlpack(), + ) + + return dlpack_d + + +def get_underlying_dtype_from_sg(sg): + """ + Returns the underlying dtype of the subgraph + """ + # FIXME: Remove after we have consistent naming + # https://github.com/rapidsai/cugraph/issues/2618 + sg_columns = sg.edgelist.edgelist_df.columns + if "src" in sg_columns: + # src for single node graph + sg_node_dtype = sg.edgelist.edgelist_df["src"].dtype + elif src_n in sg_columns: + # _SRC_ for multi-node graphs + sg_node_dtype = sg.edgelist.edgelist_df[src_n].dtype + else: + raise ValueError(f"Source column {src_n} not found in the subgraph") + + return sg_node_dtype diff --git a/python/cugraph/cugraph/gnn/graph_store.py b/python/cugraph/cugraph/gnn/graph_store.py index ba2cba0ae32..d673030f5d7 100644 --- a/python/cugraph/cugraph/gnn/graph_store.py +++ b/python/cugraph/cugraph/gnn/graph_store.py @@ -17,9 +17,17 @@ import dask_cudf import cugraph from cugraph.experimental import PropertyGraph, MGPropertyGraph -import cupy as cp from functools import cached_property +from .dgl_extensions.utils.add_data import _update_feature_map +from .dgl_extensions.utils.sampling import sample_multiple_sgs, sample_single_sg +from .dgl_extensions.utils.sampling import ( + get_subgraph_and_src_range_from_edgelist, + get_underlying_dtype_from_sg, +) +from .dgl_extensions.utils.sampling import create_dlpack_d +from .dgl_extensions.feature_storage import CuFeatureStorage + src_n = PropertyGraph.src_col_name dst_n = PropertyGraph.dst_col_name @@ -41,7 +49,7 @@ def __init__(self, graph, backend_lib="torch"): if isinstance(graph, (PropertyGraph, MGPropertyGraph)): self.__G = graph else: - raise ValueError("graph must be a PropertyGraph or" " MGPropertyGraph") + raise ValueError("graph must be a PropertyGraph or MGPropertyGraph") # dict to map column names corresponding to edge features # of each type self.edata_feat_col_d = defaultdict(list) @@ -288,7 +296,7 @@ def sample_neighbors( # Uniform sampling fails when the dtype # of the seed dtype is not same as the node dtype - self.set_sg_node_dtype(list(sgs.values())[0]) + self.set_sg_node_dtype(list(sgs.values())[0][0]) sampled_df = sample_multiple_sgs( sgs, sample_f, @@ -300,12 +308,18 @@ def sample_neighbors( ) else: if edge_dir == "in": - sg = self.extracted_reverse_subgraph + sg, start_list_range = self.extracted_reverse_subgraph else: - sg = self.extracted_subgraph + sg, start_list_range = self.extracted_subgraph self.set_sg_node_dtype(sg) sampled_df = sample_single_sg( - sg, sample_f, nodes, self._sg_node_dtype, fanout, replace + sg, + sample_f, + nodes, + self._sg_node_dtype, + start_list_range, + fanout, + replace, ) # we reverse directions when directions=='in' @@ -324,7 +338,7 @@ def sample_neighbors( if self.has_multiple_etypes: # Heterogeneous graph case d = self._get_edgeid_type_d(sampled_df["indices"], self.etypes) - d = return_dlpack_d(d) + d = create_dlpack_d(d) return d else: return ( @@ -358,12 +372,16 @@ def extracted_subgraph(self): edge_list = self.gdata.get_edge_data(columns=[src_n, dst_n, type_n]) edge_list = edge_list.reset_index(drop=True) - return get_subgraph_from_edgelist(edge_list, self.is_mg, reverse_edges=False) + return get_subgraph_and_src_range_from_edgelist( + edge_list, self.is_mg, reverse_edges=False + ) @cached_property def extracted_reverse_subgraph(self): edge_list = self.gdata.get_edge_data(columns=[src_n, dst_n, type_n]) - return get_subgraph_from_edgelist(edge_list, self.is_mg, reverse_edges=True) + return get_subgraph_and_src_range_from_edgelist( + edge_list, self.is_mg, reverse_edges=True + ) @cached_property def extracted_subgraphs_per_type(self): @@ -372,7 +390,7 @@ def extracted_subgraphs_per_type(self): edge_list = self.gdata.get_edge_data( columns=[src_n, dst_n, type_n], types=[etype] ) - sg_d[etype] = get_subgraph_from_edgelist( + sg_d[etype] = get_subgraph_and_src_range_from_edgelist( edge_list, self.is_mg, reverse_edges=False ) return sg_d @@ -384,7 +402,7 @@ def extracted_reverse_subgraphs_per_type(self): edge_list = self.gdata.get_edge_data( columns=[src_n, dst_n, type_n], types=[etype] ) - sg_d[etype] = get_subgraph_from_edgelist( + sg_d[etype] = get_subgraph_and_src_range_from_edgelist( edge_list, self.is_mg, reverse_edges=True ) return sg_d @@ -404,18 +422,8 @@ def set_sg_node_dtype(self, sg): if hasattr(self, "_sg_node_dtype"): return self._sg_node_dtype else: - # FIXME: Remove after we have consistent naming - # https://github.com/rapidsai/cugraph/issues/2618 - sg_columns = sg.edgelist.edgelist_df.columns - if "src" in sg_columns: - # src for single node graph - self._sg_node_dtype = sg.edgelist.edgelist_df["src"].dtype - elif src_n in sg_columns: - # _SRC_ for multi-node graphs - self._sg_node_dtype = sg.edgelist.edgelist_df[src_n].dtype - else: - raise ValueError(f"Source column {src_n} not found in the subgraph") - return self._sg_node_dtype + self._sg_node_dtype = get_underlying_dtype_from_sg(sg) + return self._sg_node_dtype def find_edges(self, edge_ids_cap, etype): """Return the source and destination node IDs given the edge IDs within @@ -498,252 +506,3 @@ def __clear_cached_properties(self): if "extracted_reverse_subgraphs_per_type" in self.__dict__: del self.extracted_reverse_subgraphs_per_type - - -class CuFeatureStorage: - """Storage for node/edge feature data. - - Either subclassing this class or implementing the same set of interfaces - is fine. DGL simply uses duck-typing to implement its sampling pipeline. - """ - - def __init__( - self, pg, columns, storage_type, backend_lib="torch", indices_offset=0 - ): - self.pg = pg - self.columns = columns - if backend_lib == "torch": - from torch.utils.dlpack import from_dlpack - elif backend_lib == "tf": - from tensorflow.experimental.dlpack import from_dlpack - elif backend_lib == "cupy": - from cupy import from_dlpack - else: - raise NotImplementedError( - "Only pytorch and tensorflow backends are currently supported" - ) - if storage_type not in ["edge", "node"]: - raise NotImplementedError("Only edge and node storage is supported") - - self.storage_type = storage_type - - self.from_dlpack = from_dlpack - self.indices_offset = indices_offset - - def fetch(self, indices, device=None, pin_memory=False, **kwargs): - """Fetch the features of the given node/edge IDs to the - given device. - - Parameters - ---------- - indices : Tensor - Node or edge IDs. - device : Device - Device context. - pin_memory : - - Returns - ------- - Tensor - Feature data stored in PyTorch Tensor. - """ - # Default implementation uses synchronous fetch. - - indices = cp.asarray(indices) - if isinstance(self.pg, MGPropertyGraph): - # dask_cudf loc breaks if we provide cudf series/cupy array - # https://github.com/rapidsai/cudf/issues/11877 - indices = indices.get() - else: - indices = cudf.Series(indices) - - indices = indices + self.indices_offset - - if self.storage_type == "node": - subset_df = self.pg.get_vertex_data( - vertex_ids=indices, columns=self.columns - ) - else: - subset_df = self.pg.get_edge_data(edge_ids=indices, columns=self.columns) - - subset_df = subset_df[self.columns] - - if isinstance(subset_df, dask_cudf.DataFrame): - subset_df = subset_df.compute() - - if len(subset_df) == 0: - raise ValueError(f"{indices=} not found in FeatureStorage") - cap = subset_df.to_dlpack() - tensor = self.from_dlpack(cap) - del cap - if device: - if not isinstance(tensor, cp.ndarray): - # Cant transfer to different device for cupy - tensor = tensor.to(device) - return tensor - - -def return_dlpack_d(d): - dlpack_d = {} - for k, df in d.items(): - if len(df) == 0: - dlpack_d[k] = (None, None, None) - else: - dlpack_d[k] = ( - df[src_n].to_dlpack(), - df[dst_n].to_dlpack(), - df[eid_n].to_dlpack(), - ) - - return dlpack_d - - -def sample_single_sg( - sg, sample_f, start_list, start_list_dtype, fanout, with_replacement -): - if isinstance(start_list, dict): - start_list = cudf.concat(list(start_list.values())) - - # Uniform sampling fails when the dtype - # of the seed dtype is not same as the node dtype - start_list = start_list.astype(start_list_dtype) - sampled_df = sample_f( - sg, - start_list=start_list, - fanout_vals=[fanout], - with_replacement=with_replacement, - # FIXME: is_edge_ids=True does not seem to do anything - # issue: https://github.com/rapidsai/cugraph/issues/2562 - ) - return sampled_df - - -def sample_multiple_sgs( - sgs, - sample_f, - start_list_d, - start_list_dtype, - edge_dir, - fanout, - with_replacement, -): - start_list_types = list(start_list_d.keys()) - output_dfs = [] - for can_etype, sg in sgs.items(): - can_etype = _convert_can_etype_s_to_tup(can_etype) - if _edge_types_contains_canonical_etype(can_etype, start_list_types, edge_dir): - if edge_dir == "in": - subset_type = can_etype[2] - else: - subset_type = can_etype[0] - - output = sample_single_sg( - sg, - sample_f, - start_list_d[subset_type], - start_list_dtype, - fanout, - with_replacement, - ) - output_dfs.append(output) - - if len(output_dfs) == 0: - empty_df = cudf.DataFrame({"sources": [], "destinations": [], "indices": []}) - return empty_df.astype(cp.int32) - - if isinstance(output_dfs[0], dask_cudf.DataFrame): - return dask_cudf.concat(output_dfs, ignore_index=True) - else: - return cudf.concat(output_dfs, ignore_index=True) - - -def _edge_types_contains_canonical_etype(can_etype, edge_types, edge_dir): - src_type, _, dst_type = can_etype - if edge_dir == "in": - return dst_type in edge_types - else: - return src_type in edge_types - - -def _convert_can_etype_s_to_tup(canonical_etype_s): - src_type, etype, dst_type = canonical_etype_s.split(",") - src_type = src_type[2:-1] - dst_type = dst_type[2:-2] - etype = etype[2:-1] - return (src_type, etype, dst_type) - - -def get_subgraph_from_edgelist(edge_list, is_mg, reverse_edges=False): - if reverse_edges: - edge_list = edge_list.rename(columns={src_n: dst_n, dst_n: src_n}) - - subgraph = cugraph.MultiGraph(directed=True) - if is_mg: - # FIXME: Can not switch to renumber = False - # For MNMG Algos - # Remove when https://github.com/rapidsai/cugraph/issues/2437 - # lands - create_subgraph_f = subgraph.from_dask_cudf_edgelist - renumber = True - else: - # Note: We have to keep renumber = False - # to handle cases when the seed_nodes is not present in sugraph - create_subgraph_f = subgraph.from_cudf_edgelist - renumber = False - - create_subgraph_f( - edge_list, - source=src_n, - destination=dst_n, - edge_attr=eid_n, - renumber=renumber, - # FIXME: renumber=False is not supported for MNMG algos - legacy_renum_only=True, - ) - - return subgraph - - -def _update_feature_map( - pg_feature_map, feat_name_obj, contains_vector_features, columns -): - if contains_vector_features: - if feat_name_obj is None: - raise ValueError( - "feature name must be provided when wrapping" - + " multiple columns under a single feature name" - + " or a feature map" - ) - - if isinstance(feat_name_obj, str): - pg_feature_map[feat_name_obj] = columns - - elif isinstance(feat_name_obj, dict): - covered_columns = [] - for col in feat_name_obj.keys(): - current_cols = feat_name_obj[col] - # Handle strings too - if isinstance(current_cols, str): - current_cols = [current_cols] - covered_columns = covered_columns + current_cols - - if set(covered_columns) != set(columns): - raise ValueError( - f"All the columns {columns} not covered in {covered_columns} " - f"Please check the feature_map {feat_name_obj} provided" - ) - - for key, cols in feat_name_obj.items(): - if isinstance(cols, str): - cols = [cols] - pg_feature_map[key] = cols - else: - raise ValueError(f"{feat_name_obj} should be str or dict") - else: - if feat_name_obj: - raise ValueError( - f"feat_name {feat_name_obj} is only valid when " - "wrapping multiple columns under feature names" - ) - for col in columns: - pg_feature_map[col] = [col] diff --git a/python/cugraph/cugraph/tests/mg/test_mg_dgl_extensions.py b/python/cugraph/cugraph/tests/mg/test_mg_dgl_extensions.py index 13908e52e0b..60ae2978bc5 100644 --- a/python/cugraph/cugraph/tests/mg/test_mg_dgl_extensions.py +++ b/python/cugraph/cugraph/tests/mg/test_mg_dgl_extensions.py @@ -444,6 +444,48 @@ def test_sampling_gs_heterogeneous_out_dir(gs_heterogeneous_dgl_eg): cudf.testing.assert_frame_equal(output_df, expected_df) +def test_sampling_with_out_of_index_seed(dask_client): + pg = MGPropertyGraph() + gs = CuGraphStore(pg) + node_df = cudf.DataFrame() + node_df["node_id"] = cudf.Series([0, 1, 2, 3, 4, 5]).astype("int32") + node_df = dask_cudf.from_cudf(node_df, npartitions=2) + gs.add_node_data(node_df, "node_id", "_N") + + edge_df = cudf.DataFrame() + edge_df["src"] = cudf.Series([0, 1, 2]).astype("int32") + edge_df["dst"] = cudf.Series([0, 0, 0]).astype("int32") + edge_df = dask_cudf.from_cudf(edge_df, npartitions=2) + gs.add_edge_data(edge_df, ["src", "dst"], canonical_etype="('_N', 'con.a', '_N')") + + edge_df = cudf.DataFrame() + edge_df["src"] = cudf.Series([3, 4, 5]).astype("int32") + edge_df["dst"] = cudf.Series([3, 3, 3]).astype("int32") + edge_df = dask_cudf.from_cudf(edge_df, npartitions=2) + gs.add_edge_data(edge_df, ["src", "dst"], canonical_etype="('_N', 'con.b', '_N')") + + output = gs.sample_neighbors( + {"_N": cudf.Series([0, 1, 3, 5], "int32").to_dlpack()}, fanout=3 + ) + output_e1 = ( + cudf.from_dlpack(output["('_N', 'con.a', '_N')"][0]) + .sort_values() + .reset_index(drop=True) + ) + output_e2 = ( + cudf.from_dlpack(output["('_N', 'con.b', '_N')"][0]) + .sort_values() + .reset_index(drop=True) + ) + + cudf.testing.assert_series_equal( + output_e1, cudf.Series([0, 1, 2], dtype="int32", name=0) + ) + cudf.testing.assert_series_equal( + output_e2, cudf.Series([3, 4, 5], dtype="int32", name=0) + ) + + # Util to help testing def get_cudf_ser_from_cap_tup(cap_t): src_id, dst_id, e_id = cap_t diff --git a/python/cugraph/cugraph/tests/test_graph_store.py b/python/cugraph/cugraph/tests/test_graph_store.py index 5b00749e4cd..1b76744d393 100644 --- a/python/cugraph/cugraph/tests/test_graph_store.py +++ b/python/cugraph/cugraph/tests/test_graph_store.py @@ -813,6 +813,45 @@ def test_add_node_data_vector_feats(): cp.testing.assert_array_equal(out_vec, exp_vec) +def test_sampling_with_out_of_index_seed(): + pg = PropertyGraph() + gs = CuGraphStore(pg) + node_df = cudf.DataFrame() + node_df["node_id"] = cudf.Series([0, 1, 2, 3, 4, 5]).astype("int32") + gs.add_node_data(node_df, "node_id", "_N") + + edge_df = cudf.DataFrame() + edge_df["src"] = cudf.Series([0, 1, 2]).astype("int32") + edge_df["dst"] = cudf.Series([0, 0, 0]).astype("int32") + gs.add_edge_data(edge_df, ["src", "dst"], canonical_etype="('_N', 'con.a', '_N')") + + edge_df = cudf.DataFrame() + edge_df["src"] = cudf.Series([3, 4, 5]).astype("int32") + edge_df["dst"] = cudf.Series([3, 3, 3]).astype("int32") + gs.add_edge_data(edge_df, ["src", "dst"], canonical_etype="('_N', 'con.b', '_N')") + + output = gs.sample_neighbors( + {"_N": cudf.Series([0, 1, 3, 5], "int32").to_dlpack()}, fanout=3 + ) + output_e1 = ( + cudf.from_dlpack(output["('_N', 'con.a', '_N')"][0]) + .sort_values() + .reset_index(drop=True) + ) + output_e2 = ( + cudf.from_dlpack(output["('_N', 'con.b', '_N')"][0]) + .sort_values() + .reset_index(drop=True) + ) + + cudf.testing.assert_series_equal( + output_e1, cudf.Series([0, 1, 2], dtype="int32", name=0) + ) + cudf.testing.assert_series_equal( + output_e2, cudf.Series([3, 4, 5], dtype="int32", name=0) + ) + + def assert_correct_eids(edge_df, sample_edge_id_df): # We test that all src, dst correspond to the correct # eids in the sample_edge_id_df