From 2a6c9cfbe0b2ac51f63c22dc0161469693a544cc Mon Sep 17 00:00:00 2001 From: Erik Welch Date: Tue, 9 Aug 2022 16:13:29 -0700 Subject: [PATCH 1/8] PropertyGraph set index to vertex and edge ids Currently, this only does SG version for #2401. MG is still TODO. This also doesn't change anything user-facing (yet). --- .../cugraph/structure/property_graph.py | 158 +++++++++++------- 1 file changed, 99 insertions(+), 59 deletions(-) diff --git a/python/cugraph/cugraph/structure/property_graph.py b/python/cugraph/cugraph/structure/property_graph.py index 09c7f6b0040..09b1d95a08f 100644 --- a/python/cugraph/cugraph/structure/property_graph.py +++ b/python/cugraph/cugraph/structure/property_graph.py @@ -64,7 +64,6 @@ class EXPERIMENTAL__PropertyGraph: dst_col_name = "_DST_" type_col_name = "_TYPE_" edge_id_col_name = "_EDGE_ID_" - vertex_id_col_name = "_VERTEX_ID_" weight_col_name = "_WEIGHT_" _default_type_name = "" @@ -144,16 +143,15 @@ def __init__(self): @property def edges(self): if self.__edge_prop_dataframe is not None: - return self.__edge_prop_dataframe[[self.src_col_name, - self.dst_col_name, - self.edge_id_col_name]] + return self.__edge_prop_dataframe[ + [self.src_col_name, self.dst_col_name] + ].reset_index() return None @property def vertex_property_names(self): if self.__vertex_prop_dataframe is not None: props = list(self.__vertex_prop_dataframe.columns) - props.remove(self.vertex_col_name) props.remove(self.type_col_name) # should "type" be removed? return props return [] @@ -164,7 +162,6 @@ def edge_property_names(self): props = list(self.__edge_prop_dataframe.columns) props.remove(self.src_col_name) props.remove(self.dst_col_name) - props.remove(self.edge_id_col_name) props.remove(self.type_col_name) # should "type" be removed? if self.weight_col_name in props: props.remove(self.weight_col_name) @@ -406,6 +403,8 @@ def add_vertex_data(self, self.__vertex_prop_dataframe = self.__update_dataframe_dtypes( self.__vertex_prop_dataframe, {self.vertex_col_name: dataframe[vertex_col_name].dtype}) + self.__vertex_prop_dataframe.set_index(self.vertex_col_name, + inplace=True) # Ensure that both the predetermined vertex ID column name and vertex # type column name are present for proper merging. @@ -435,13 +434,26 @@ def add_vertex_data(self, tmp_df, self.__vertex_prop_dataframe) self.__vertex_prop_dtypes.update(new_col_info) + # Join on shared columns and the indices + tmp_df.set_index(self.vertex_col_name, inplace=True) + cols = ( + self.__vertex_prop_dataframe.columns.intersection(tmp_df.columns) + .to_list() + ) + cols.append(self.vertex_col_name) self.__vertex_prop_dataframe = \ - self.__vertex_prop_dataframe.merge(tmp_df, how="outer") + self.__vertex_prop_dataframe.merge(tmp_df, on=cols, how="outer") # Update the vertex eval dict with the latest column instances - latest = dict([(n, self.__vertex_prop_dataframe[n]) - for n in self.__vertex_prop_dataframe.columns]) + if self.__series_type is cudf.Series: + latest = {n: self.__vertex_prop_dataframe[n] + for n in self.__vertex_prop_dataframe.columns} + else: + latest = self.__vertex_prop_dataframe.to_dict('series') self.__vertex_prop_eval_dict.update(latest) + self.__vertex_prop_eval_dict[self.vertex_col_name] = ( + self.__vertex_prop_dataframe.index + ) def get_vertex_data(self, vertex_ids=None, types=None, columns=None): """ @@ -450,11 +462,10 @@ def get_vertex_data(self, vertex_ids=None, types=None, columns=None): """ if self.__vertex_prop_dataframe is not None: if vertex_ids is not None: - df_mask = ( - self.__vertex_prop_dataframe[self.vertex_col_name] - .isin(vertex_ids) - ) - df = self.__vertex_prop_dataframe.loc[df_mask] + if not isinstance(vertex_ids, + (list, slice, self.__series_type)): + vertex_ids = list(vertex_ids) + df = self.__vertex_prop_dataframe.loc[vertex_ids] else: df = self.__vertex_prop_dataframe @@ -466,12 +477,11 @@ def get_vertex_data(self, vertex_ids=None, types=None, columns=None): # The "internal" pG.vertex_col_name and pG.type_col_name columns # are also included/added since they are assumed to be needed by # the caller. - if columns is None: - return df - else: + if columns is not None: # FIXME: invalid columns will result in a KeyError, should a # check be done here and a more PG-specific error raised? - return df[[self.vertex_col_name, self.type_col_name] + columns] + df = df[[self.type_col_name] + columns] + return df.reset_index() return None @@ -553,7 +563,6 @@ def add_edge_data(self, default_edge_columns = [self.src_col_name, self.dst_col_name, - self.edge_id_col_name, self.type_col_name] if self.__edge_prop_dataframe is None: self.__edge_prop_dataframe = \ @@ -565,8 +574,8 @@ def add_edge_data(self, self.__edge_prop_dataframe = self.__update_dataframe_dtypes( self.__edge_prop_dataframe, {self.src_col_name: dataframe[vertex_col_names[0]].dtype, - self.dst_col_name: dataframe[vertex_col_names[1]].dtype, - self.edge_id_col_name: "Int64"}) + self.dst_col_name: dataframe[vertex_col_names[1]].dtype}) + self.__edge_prop_dataframe.index.name = self.edge_id_col_name # NOTE: This copies the incoming DataFrame in order to add the new # columns. The copied DataFrame is then merged (another copy) and then @@ -578,14 +587,18 @@ def add_edge_data(self, # Add unique edge IDs to the new rows. This is just a count for each # row starting from the last edge ID value, with initial edge ID 0. - starting_eid = ( - -1 if self.__last_edge_id is None else self.__last_edge_id - ) - tmp_df[self.edge_id_col_name] = 1 - tmp_df[self.edge_id_col_name] = ( - tmp_df[self.edge_id_col_name].cumsum() + starting_eid + start_eid = ( + 0 if self.__last_edge_id is None else self.__last_edge_id ) - self.__last_edge_id = starting_eid + len(tmp_df.index) + end_eid = start_eid + len(tmp_df) # exclusive + if self.__series_type is cudf.Series: + index_class = cudf.RangeIndex + else: + index_class = pd.RangeIndex + tmp_df.index = index_class(start_eid, end_eid, + name=self.edge_id_col_name) + + self.__last_edge_id = end_eid if property_columns: # all columns @@ -604,13 +617,25 @@ def add_edge_data(self, tmp_df, self.__edge_prop_dataframe) self.__edge_prop_dtypes.update(new_col_info) + # Join on shared columns and the indices + cols = ( + self.__edge_prop_dataframe.columns.intersection(tmp_df.columns) + .to_list() + ) + cols.append(self.edge_id_col_name) self.__edge_prop_dataframe = \ - self.__edge_prop_dataframe.merge(tmp_df, how="outer") + self.__edge_prop_dataframe.merge(tmp_df, on=cols, how="outer") - # Update the vertex eval dict with the latest column instances - latest = dict([(n, self.__edge_prop_dataframe[n]) - for n in self.__edge_prop_dataframe.columns]) + # Update the edge eval dict with the latest column instances + if self.__series_type is cudf.Series: + latest = {n: self.__edge_prop_dataframe[n] + for n in self.__edge_prop_dataframe.columns} + else: + latest = self.__edge_prop_dataframe.to_dict('series') self.__edge_prop_eval_dict.update(latest) + self.__edge_prop_eval_dict[self.edge_id_col_name] = ( + self.__edge_prop_dataframe.index + ) def get_edge_data(self, edge_ids=None, types=None, columns=None): """ @@ -619,9 +644,10 @@ def get_edge_data(self, edge_ids=None, types=None, columns=None): """ if self.__edge_prop_dataframe is not None: if edge_ids is not None: - df_mask = self.__edge_prop_dataframe[self.edge_id_col_name]\ - .isin(edge_ids) - df = self.__edge_prop_dataframe.loc[df_mask] + if not isinstance(edge_ids, + (list, slice, self.__series_type)): + edge_ids = list(edge_ids) + df = self.__edge_prop_dataframe.loc[edge_ids] else: df = self.__edge_prop_dataframe @@ -637,13 +663,13 @@ def get_edge_data(self, edge_ids=None, types=None, columns=None): all_columns = list(self.__edge_prop_dataframe.columns) if self.weight_col_name in all_columns: all_columns.remove(self.weight_col_name) - return df[all_columns] + df = df[all_columns] else: # FIXME: invalid columns will result in a KeyError, should a # check be done here and a more PG-specific error raised? - return df[[self.src_col_name, self.dst_col_name, - self.edge_id_col_name, self.type_col_name] - + columns] + df = df[[self.src_col_name, self.dst_col_name, + self.type_col_name] + columns] + return df.reset_index() return None @@ -682,16 +708,13 @@ def select_vertices(self, expr, from_previous_selection=None): (from_previous_selection.vertex_selections is not None): previously_selected_rows = self.__vertex_prop_dataframe[ from_previous_selection.vertex_selections] - verts_from_previously_selected_rows = \ - previously_selected_rows[self.vertex_col_name] - # get all the rows from the entire __vertex_prop_dataframe that - # contain those verts - rows_with_verts = \ - self.__vertex_prop_dataframe[self.vertex_col_name]\ - .isin(verts_from_previously_selected_rows) - rows_to_eval = self.__vertex_prop_dataframe[rows_with_verts] + + rows_to_eval = self.__vertex_prop_dataframe.loc[ + previously_selected_rows.index] + locals = dict([(n, rows_to_eval[n]) for n in rows_to_eval.columns]) + locals[self.vertex_col_name] = rows_to_eval.index else: locals = self.__vertex_prop_eval_dict @@ -705,8 +728,10 @@ def select_vertices(self, expr, from_previous_selection=None): # __vertex_prop_dataframe to determine which rows to use when creating # a Graph from a query. if num_rows != len(selected_col): - selected_col = selected_col.reindex(range(num_rows), copy=False) - selected_col.fillna(False, inplace=True) + selected_col = selected_col.reindex( + self.__vertex_prop_dataframe.index, + fill_value=False, + copy=False) return EXPERIMENTAL__PropertySelection( vertex_selection_series=selected_col) @@ -823,12 +848,21 @@ def extract_subgraph(self, # selected verts in both src and dst if (selected_vertex_dataframe is not None) and \ not(selected_vertex_dataframe.empty): - selected_verts = selected_vertex_dataframe[self.vertex_col_name] has_srcs = selected_edge_dataframe[self.src_col_name]\ - .isin(selected_verts) + .isin(selected_vertex_dataframe.index) has_dsts = selected_edge_dataframe[self.dst_col_name]\ - .isin(selected_verts) + .isin(selected_vertex_dataframe.index) edges = selected_edge_dataframe[has_srcs & has_dsts] + # Alternative to benchmark + # edges = selected_edge_dataframe.merge( + # selected_vertex_dataframe[[]], + # left_on=self.src_col_name, + # right_index=True, + # ).merge( + # selected_vertex_dataframe[[]], + # left_on=self.dst_col_name, + # right_index=True, + # ) else: edges = selected_edge_dataframe @@ -893,11 +927,19 @@ def annotate_dataframe(self, df, G, edge_vertex_col_names): else: raise AttributeError("Graph G does not have attribute 'edge_data'") + # Join on shared columns and the indices + cols = ( + self.__edge_prop_dataframe.columns + .intersection(edge_info_df.columns) + .to_list() + ) + cols.append(self.edge_id_col_name) + # New result includes only properties from the src/dst edges identified # by edge IDs. All other data in df is merged based on src/dst values. # NOTE: results from MultiGraph graphs will have to include edge IDs! edge_props_df = edge_info_df.merge(self.__edge_prop_dataframe, - how="inner") + on=cols, how="inner") # FIXME: also allow edge ID col to be passed in and renamed. new_df = df.rename(columns={src_col_name: self.src_col_name, @@ -995,9 +1037,9 @@ def edge_props_to_graph(self, "renumber": renumber_graph, } if type(edge_prop_df) is cudf.DataFrame: - G.from_cudf_edgelist(edge_prop_df, **create_args) + G.from_cudf_edgelist(edge_prop_df.reset_index(), **create_args) else: - G.from_pandas_edgelist(edge_prop_df, **create_args) + G.from_pandas_edgelist(edge_prop_df.reset_index(), **create_args) if add_edge_data: # Set the edge_data on the resulting Graph to a DataFrame @@ -1033,10 +1075,8 @@ def __create_property_lookup_table(self, edge_prop_df): """ src = edge_prop_df[self.src_col_name] dst = edge_prop_df[self.dst_col_name] - edge_id = edge_prop_df[self.edge_id_col_name] return self.__dataframe_type({self.src_col_name: src, - self.dst_col_name: dst, - self.edge_id_col_name: edge_id}) + self.dst_col_name: dst}).reset_index() def __get_all_vertices_series(self): """ @@ -1047,7 +1087,7 @@ def __get_all_vertices_series(self): epd = self.__edge_prop_dataframe vert_sers = [] if vpd is not None: - vert_sers.append(vpd[self.vertex_col_name]) + vert_sers.append(vpd.index.to_series()) if epd is not None: vert_sers.append(epd[self.src_col_name]) vert_sers.append(epd[self.dst_col_name]) From 4c93f776fa56e16850a9a399f8c4fea11e605cd6 Mon Sep 17 00:00:00 2001 From: Erik Welch Date: Wed, 10 Aug 2022 10:55:43 -0700 Subject: [PATCH 2/8] Update graph_store --- python/cugraph/cugraph/gnn/graph_store.py | 32 ++++--------------- .../cugraph/cugraph/tests/test_graph_store.py | 8 +++++ 2 files changed, 15 insertions(+), 25 deletions(-) diff --git a/python/cugraph/cugraph/gnn/graph_store.py b/python/cugraph/cugraph/gnn/graph_store.py index 0b40cc3bf0a..6e8f36bffa3 100644 --- a/python/cugraph/cugraph/gnn/graph_store.py +++ b/python/cugraph/cugraph/gnn/graph_store.py @@ -84,9 +84,7 @@ def get_node_storage(self, key, ntype=None): ) ) ntype = ntypes[0] - # FIXME: Remove once below lands - # https://github.com/rapidsai/cugraph/pull/2444 - df = self.gdata._vertex_prop_dataframe + df = self.gdata.get_vertex_data() col_names = self.ndata_key_col_d[key] return CuFeatureStorage( df=df, @@ -109,9 +107,7 @@ def get_edge_storage(self, key, etype=None): etype = etypes[0] col_names = self.edata_key_col_d[key] - # FIXME: Remove once below lands - # https://github.com/rapidsai/cugraph/pull/2444 - df = self.gdata._edge_prop_dataframe + df = self.gdata.get_edge_data() return CuFeatureStorage( df=df, id_col=eid_n, @@ -128,19 +124,11 @@ def num_edges(self, etype=None): @property def ntypes(self): - # FIXME: Remove once below is fixed - # https://github.com/rapidsai/cugraph/issues/2423 - s = self.gdata._vertex_prop_dataframe[type_n] - ntypes = s.drop_duplicates().to_arrow().to_pylist() - return ntypes + return sorted(self.gdata.vertex_types) @property def etypes(self): - # FIXME: Remove once below is fixed - # https://github.com/rapidsai/cugraph/issues/2423 - s = self.gdata._edge_prop_dataframe[type_n] - ntypes = s.drop_duplicates().to_arrow().to_pylist() - return ntypes + return sorted(self.gdata.edge_types) @property def ndata(self): @@ -253,9 +241,7 @@ def sample_neighbors( columns={"sources": src_n, "destinations": dst_n}, inplace=True ) - # FIXME: Remove once below lands - # https://github.com/rapidsai/cugraph/issues/2444 - edge_df = self.gdata._edge_prop_dataframe[[src_n, dst_n, eid_n]] + edge_df = self.gdata.edges sampled_df = edge_df.merge(sampled_df) return ( @@ -269,6 +255,7 @@ def extracted_reverse_subgraph_without_renumbering(self): # TODO: Switch to extract_subgraph based on response on # https://github.com/rapidsai/cugraph/issues/2458 subset_df = self.gdata._edge_prop_dataframe[[src_n, dst_n]] + subset_df.reset_index(drop=True, inplace=True) # drop edge ids subset_df.rename(columns={src_n: dst_n, dst_n: src_n}, inplace=True) subset_df["weight"] = cp.float32(1.0) subgraph = cugraph.Graph(directed=True) @@ -307,12 +294,7 @@ def find_edges(self, edge_ids_cap, etype): The dst nodes for the given ids """ edge_ids = cudf.from_dlpack(edge_ids_cap) - - # FIXME: Remove once below lands - # https://github.com/rapidsai/cugraph/issues/2444 - edge_df = self.gdata._edge_prop_dataframe[[src_n, dst_n, - eid_n, type_n]] - + edge_df = self.gdata.get_edge_data(columns=[]) subset_df = get_subset_df( edge_df, PropertyGraph.edge_id_col_name, edge_ids, etype ) diff --git a/python/cugraph/cugraph/tests/test_graph_store.py b/python/cugraph/cugraph/tests/test_graph_store.py index 3c7a7262025..b92fafdfb32 100644 --- a/python/cugraph/cugraph/tests/test_graph_store.py +++ b/python/cugraph/cugraph/tests/test_graph_store.py @@ -387,6 +387,14 @@ def test_num_edges(dataset1_CuGraphStore): assert dataset1_CuGraphStore.num_edges() == 14 +def test_etypes(dataset1_CuGraphStore): + assert dataset1_CuGraphStore.etypes == ['referrals', 'relationships', 'transactions'] + + +def test_ntypes(dataset1_CuGraphStore): + assert dataset1_CuGraphStore.ntypes == ['merchant', 'taxpayers', 'user'] + + def test_get_node_storage_gs(dataset1_CuGraphStore): fs = dataset1_CuGraphStore.get_node_storage( key="merchant_k", ntype="merchant" From 26317152febd22b2630d5689d62bcab430437df4 Mon Sep 17 00:00:00 2001 From: Erik Welch Date: Wed, 10 Aug 2022 11:13:06 -0700 Subject: [PATCH 3/8] flake8 --- python/cugraph/cugraph/tests/test_graph_store.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/python/cugraph/cugraph/tests/test_graph_store.py b/python/cugraph/cugraph/tests/test_graph_store.py index b92fafdfb32..6a44a9f6e3d 100644 --- a/python/cugraph/cugraph/tests/test_graph_store.py +++ b/python/cugraph/cugraph/tests/test_graph_store.py @@ -388,7 +388,9 @@ def test_num_edges(dataset1_CuGraphStore): def test_etypes(dataset1_CuGraphStore): - assert dataset1_CuGraphStore.etypes == ['referrals', 'relationships', 'transactions'] + assert dataset1_CuGraphStore.etypes == [ + 'referrals', 'relationships', 'transactions' + ] def test_ntypes(dataset1_CuGraphStore): From 99c2e0ecc3db08ca6adee23cce27e60aace2ff45 Mon Sep 17 00:00:00 2001 From: Erik Welch Date: Wed, 14 Sep 2022 10:37:58 -0700 Subject: [PATCH 4/8] Set index to vertex or edge IDs in PG for MG This includes a slow workaround for rapidsai/cudf#11550 --- .../dask/structure/mg_property_graph.py | 120 ++++++++++++------ .../cugraph/structure/property_graph.py | 1 + 2 files changed, 81 insertions(+), 40 deletions(-) diff --git a/python/cugraph/cugraph/dask/structure/mg_property_graph.py b/python/cugraph/cugraph/dask/structure/mg_property_graph.py index 42627711220..ac7db2ca946 100644 --- a/python/cugraph/cugraph/dask/structure/mg_property_graph.py +++ b/python/cugraph/cugraph/dask/structure/mg_property_graph.py @@ -58,7 +58,6 @@ class EXPERIMENTAL__MGPropertyGraph: dst_col_name = "_DST_" type_col_name = "_TYPE_" edge_id_col_name = "_EDGE_ID_" - vertex_id_col_name = "_VERTEX_ID_" weight_col_name = "_WEIGHT_" _default_type_name = "" @@ -139,16 +138,15 @@ def __init__(self, num_workers=None): @property def edges(self): if self.__edge_prop_dataframe is not None: - return self.__edge_prop_dataframe[[self.src_col_name, - self.dst_col_name, - self.edge_id_col_name]] + return self.__edge_prop_dataframe[ + [self.src_col_name, self.dst_col_name] + ].reset_index() return None @property def vertex_property_names(self): if self.__vertex_prop_dataframe is not None: props = list(self.__vertex_prop_dataframe.columns) - props.remove(self.vertex_col_name) props.remove(self.type_col_name) # should "type" be removed? return props return [] @@ -159,7 +157,6 @@ def edge_property_names(self): props = list(self.__edge_prop_dataframe.columns) props.remove(self.src_col_name) props.remove(self.dst_col_name) - props.remove(self.edge_id_col_name) props.remove(self.type_col_name) # should "type" be removed? if self.weight_col_name in props: props.remove(self.weight_col_name) @@ -394,6 +391,8 @@ def add_vertex_data(self, self.__update_dataframe_dtypes( self.__vertex_prop_dataframe, {self.vertex_col_name: dataframe[vertex_col_name].dtype}) + self.__vertex_prop_dataframe = \ + self.__vertex_prop_dataframe.set_index(self.vertex_col_name) # Ensure that both the predetermined vertex ID column name and vertex # type column name are present for proper merging. @@ -423,13 +422,30 @@ def add_vertex_data(self, tmp_df, self.__vertex_prop_dataframe) self.__vertex_prop_dtypes.update(new_col_info) - self.__vertex_prop_dataframe = \ - self.__vertex_prop_dataframe.merge(tmp_df, how="outer") - self.__vertex_prop_dataframe.reset_index() + # Join on shared columns and the indices + tmp_df = tmp_df.set_index(self.vertex_col_name) + cols = ( + self.__vertex_prop_dataframe.columns.intersection(tmp_df.columns) + .to_list() + ) + cols.append(self.vertex_col_name) + # FIXME: workaround for: https://github.com/rapidsai/cudf/issues/11550 + self.__vertex_prop_dataframe = ( + self.__vertex_prop_dataframe + .reset_index() + .merge(tmp_df.reset_index(), on=cols, how='outer') + .set_index(self.vertex_col_name) + ) + # self.__vertex_prop_dataframe = \ + # self.__vertex_prop_dataframe.merge(tmp_df, on=cols, how="outer") + # Update the vertex eval dict with the latest column instances - latest = dict([(n, self.__vertex_prop_dataframe[n]) - for n in self.__vertex_prop_dataframe.columns]) + latest = {n: self.__vertex_prop_dataframe[n] + for n in self.__vertex_prop_dataframe.columns} self.__vertex_prop_eval_dict.update(latest) + self.__vertex_prop_eval_dict[self.vertex_col_name] = ( + self.__vertex_prop_dataframe.index + ) def get_vertex_data(self, vertex_ids=None, types=None, columns=None): """ @@ -438,11 +454,10 @@ def get_vertex_data(self, vertex_ids=None, types=None, columns=None): """ if self.__vertex_prop_dataframe is not None: if vertex_ids is not None: - df_mask = ( - self.__vertex_prop_dataframe[self.vertex_col_name] - .isin(vertex_ids) - ) - df = self.__vertex_prop_dataframe.loc[df_mask] + if not isinstance(vertex_ids, + (list, slice, self.__series_type)): + vertex_ids = list(vertex_ids) + df = self.__vertex_prop_dataframe.loc[vertex_ids] else: df = self.__vertex_prop_dataframe @@ -454,12 +469,11 @@ def get_vertex_data(self, vertex_ids=None, types=None, columns=None): # The "internal" pG.vertex_col_name and pG.type_col_name columns # are also included/added since they are assumed to be needed by # the caller. - if columns is None: - return df - else: + if columns is not None: # FIXME: invalid columns will result in a KeyError, should a # check be done here and a more PG-specific error raised? - return df[[self.vertex_col_name, self.type_col_name] + columns] + df = df[[self.type_col_name] + columns] + return df.reset_index() return None @@ -530,15 +544,14 @@ def add_edge_data(self, default_edge_columns = [self.src_col_name, self.dst_col_name, - self.edge_id_col_name, self.type_col_name] if self.__edge_prop_dataframe is None: temp_dataframe = cudf.DataFrame(columns=default_edge_columns) self.__update_dataframe_dtypes( temp_dataframe, {self.src_col_name: dataframe[vertex_col_names[0]].dtype, - self.dst_col_name: dataframe[vertex_col_names[1]].dtype, - self.edge_id_col_name: "Int64"}) + self.dst_col_name: dataframe[vertex_col_names[1]].dtype}) + temp_dataframe.index.name = self.edge_id_col_name self.__edge_prop_dataframe = \ dask_cudf.from_cudf(temp_dataframe, npartitions=self.__num_workers) @@ -552,6 +565,7 @@ def add_edge_data(self, # Add unique edge IDs to the new rows. This is just a count for each # row starting from the last edge ID value, with initial edge ID 0. + # FIXME: can we assign index instead of column? starting_eid = ( -1 if self.__last_edge_id is None else self.__last_edge_id ) @@ -559,8 +573,9 @@ def add_edge_data(self, tmp_df[self.edge_id_col_name] = ( tmp_df[self.edge_id_col_name].cumsum() + starting_eid ) + tmp_df = tmp_df.set_index(self.edge_id_col_name) + tmp_df = tmp_df.persist() self.__last_edge_id = starting_eid + len(tmp_df.index) - tmp_df.persist() if property_columns: # all columns @@ -579,13 +594,29 @@ def add_edge_data(self, tmp_df, self.__edge_prop_dataframe) self.__edge_prop_dtypes.update(new_col_info) - self.__edge_prop_dataframe = \ - self.__edge_prop_dataframe.merge(tmp_df, how="outer") + # Join on shared columns and the indices + cols = ( + self.__edge_prop_dataframe.columns.intersection(tmp_df.columns) + .to_list() + ) + cols.append(self.edge_id_col_name) + # FIXME: workaround for: https://github.com/rapidsai/cudf/issues/11550 + self.__edge_prop_dataframe = ( + self.__edge_prop_dataframe + .reset_index() + .merge(tmp_df.reset_index(), on=cols, how='outer') + .set_index(self.edge_id_col_name) + ) + # self.__edge_prop_dataframe = \ + # self.__edge_prop_dataframe.merge(tmp_df, on=cols, how="outer") - # Update the vertex eval dict with the latest column instances + # Update the edge eval dict with the latest column instances latest = dict([(n, self.__edge_prop_dataframe[n]) for n in self.__edge_prop_dataframe.columns]) self.__edge_prop_eval_dict.update(latest) + self.__edge_prop_eval_dict[self.edge_id_col_name] = ( + self.__edge_prop_dataframe.index + ) def get_edge_data(self, edge_ids=None, types=None, columns=None): """ @@ -594,9 +625,10 @@ def get_edge_data(self, edge_ids=None, types=None, columns=None): """ if self.__edge_prop_dataframe is not None: if edge_ids is not None: - df_mask = self.__edge_prop_dataframe[self.edge_id_col_name]\ - .isin(edge_ids) - df = self.__edge_prop_dataframe.loc[df_mask] + if not isinstance(edge_ids, + (list, slice, self.__series_type)): + edge_ids = list(edge_ids) + df = self.__edge_prop_dataframe.loc[edge_ids] else: df = self.__edge_prop_dataframe @@ -612,13 +644,13 @@ def get_edge_data(self, edge_ids=None, types=None, columns=None): all_columns = list(self.__edge_prop_dataframe.columns) if self.weight_col_name in all_columns: all_columns.remove(self.weight_col_name) - return df[all_columns] + df = df[all_columns] else: # FIXME: invalid columns will result in a KeyError, should a # check be done here and a more PG-specific error raised? - return df[[self.src_col_name, self.dst_col_name, - self.edge_id_col_name, self.type_col_name] - + columns] + df = df[[self.src_col_name, self.dst_col_name, + self.type_col_name] + columns] + return df.reset_index() return None @@ -737,12 +769,21 @@ def extract_subgraph(self, # selected verts in both src and dst if (selected_vertex_dataframe is not None) and \ not(selected_vertex_dataframe.empty): - selected_verts = selected_vertex_dataframe[self.vertex_col_name] has_srcs = selected_edge_dataframe[self.src_col_name]\ - .isin(selected_verts) + .isin(selected_vertex_dataframe.index) has_dsts = selected_edge_dataframe[self.dst_col_name]\ - .isin(selected_verts) + .isin(selected_vertex_dataframe.index) edges = selected_edge_dataframe[has_srcs & has_dsts] + # Alternative to benchmark + # edges = selected_edge_dataframe.merge( + # selected_vertex_dataframe[[]], + # left_on=self.src_col_name, + # right_index=True, + # ).merge( + # selected_vertex_dataframe[[]], + # left_on=self.dst_col_name, + # right_index=True, + # ) else: edges = selected_edge_dataframe @@ -898,8 +939,7 @@ def __create_property_lookup_table(self, edge_prop_df): values from edge_prop_df. """ return edge_prop_df[[self.src_col_name, - self.dst_col_name, - self.edge_id_col_name]] + self.dst_col_name]].reset_index() def __get_all_vertices_series(self): """ @@ -910,7 +950,7 @@ def __get_all_vertices_series(self): epd = self.__edge_prop_dataframe vert_sers = [] if vpd is not None: - vert_sers.append(vpd[self.vertex_col_name]) + vert_sers.append(vpd.index.to_series()) if epd is not None: vert_sers.append(epd[self.src_col_name]) vert_sers.append(epd[self.dst_col_name]) diff --git a/python/cugraph/cugraph/structure/property_graph.py b/python/cugraph/cugraph/structure/property_graph.py index 4e2dddec672..9954b8a0b3d 100644 --- a/python/cugraph/cugraph/structure/property_graph.py +++ b/python/cugraph/cugraph/structure/property_graph.py @@ -263,6 +263,7 @@ def get_num_vertices(self, type=None, *, include_edge_data=True): else: self.__num_vertices = pd.concat(vert_sers).nunique() return self.__num_vertices + value_counts = self._vertex_type_value_counts if type == self._default_type_name and include_edge_data: # The default type, "", can refer to both vertex and edge data From 9bbf0488bfa0b856c1c45b7ca444e60146c34455 Mon Sep 17 00:00:00 2001 From: Alexandria Barghi Date: Wed, 21 Sep 2022 17:53:07 +0000 Subject: [PATCH 5/8] fixes --- .../cugraph/dask/structure/mg_property_graph.py | 1 + .../gnn/pyg_extensions/data/cugraph_store.py | 17 +++++++++-------- .../cugraph/tests/mg/test_mg_pyg_extensions.py | 13 ++++++------- .../cugraph/tests/test_pyg_extensions.py | 9 ++++----- 4 files changed, 20 insertions(+), 20 deletions(-) diff --git a/python/cugraph/cugraph/dask/structure/mg_property_graph.py b/python/cugraph/cugraph/dask/structure/mg_property_graph.py index ac7db2ca946..89d54c08383 100644 --- a/python/cugraph/cugraph/dask/structure/mg_property_graph.py +++ b/python/cugraph/cugraph/dask/structure/mg_property_graph.py @@ -256,6 +256,7 @@ def get_num_vertices(self, type=None, *, include_edge_data=True): vert_sers = self.__get_all_vertices_series() if vert_sers: if self.__series_type is dask_cudf.Series: + print([(x,x.dtype) for x in vert_sers]) vert_count = dask_cudf.concat(vert_sers).nunique() self.__num_vertices = vert_count.compute() return self.__num_vertices diff --git a/python/cugraph/cugraph/gnn/pyg_extensions/data/cugraph_store.py b/python/cugraph/cugraph/gnn/pyg_extensions/data/cugraph_store.py index 39805241426..b87f12c0779 100644 --- a/python/cugraph/cugraph/gnn/pyg_extensions/data/cugraph_store.py +++ b/python/cugraph/cugraph/gnn/pyg_extensions/data/cugraph_store.py @@ -205,14 +205,14 @@ def __init__(self, G, backend='torch'): srcs = srcs.compute() dst_types = self.__graph.get_vertex_data( - vertex_ids=dsts, + vertex_ids=dsts.values_host, columns=[self.__graph.type_col_name] )[self.__graph.type_col_name].unique() src_types = self.__graph.get_vertex_data( - vertex_ids=srcs, - columns=['_TYPE_'] - )._TYPE_.unique() + vertex_ids=srcs.values_host, + columns=[self.__graph.type_col_name] + )[self.__graph.type_col_name].unique() if self.is_mg: dst_types = dst_types.compute() @@ -434,8 +434,9 @@ def neighbor_sample( ).unique() noi = self.__graph.get_vertex_data( - nodes_of_interest.compute() if self.is_mg else nodes_of_interest, - columns=[self.__graph.vertex_col_name, self.__graph.type_col_name] + nodes_of_interest.compute().values_host if self.is_mg + else nodes_of_interest, + columns=[self.__graph.type_col_name] ) noi_types = noi[self.__graph.type_col_name].unique() @@ -595,13 +596,13 @@ def _get_tensor(self, attr): if len(self.__graph.vertex_types) == 1: # make sure we don't waste computation if there's only 1 type df = self.__graph.get_vertex_data( - vertex_ids=idx, + vertex_ids=idx.get(), types=None, columns=cols ) else: df = self.__graph.get_vertex_data( - vertex_ids=idx, + vertex_ids=idx.get(), types=[attr.group_name], columns=cols ) diff --git a/python/cugraph/cugraph/tests/mg/test_mg_pyg_extensions.py b/python/cugraph/cugraph/tests/mg/test_mg_pyg_extensions.py index 436b7b193fc..a646837b398 100644 --- a/python/cugraph/cugraph/tests/mg/test_mg_pyg_extensions.py +++ b/python/cugraph/cugraph/tests/mg/test_mg_pyg_extensions.py @@ -249,13 +249,13 @@ def multi_edge_multi_vertex_property_graph_1(dask_client): 3, 4 ], dtype='int32'), - 'vertex_type': [ + 'vertex_type': cudf.Series([ 'brown', 'brown', 'brown', 'black', 'black', - ] + ], dtype=str) }), npartitions=2 ) @@ -444,8 +444,7 @@ def test_neighbor_sample(single_vertex_graph): for node_type, node_ids in noi_groups.items(): actual_vertex_ids = pG.get_vertex_data( - types=[node_type], - columns=[pG.vertex_col_name] + types=[node_type] )[pG.vertex_col_name].compute().to_cupy() assert list(node_ids) == list(actual_vertex_ids) @@ -518,7 +517,7 @@ def test_get_tensor(graph): if property_name != 'vertex_type': base_series = pG.get_vertex_data( types=[vertex_type], - columns=[property_name, pG.vertex_col_name] + columns=[property_name] ) vertex_ids = base_series[pG.vertex_col_name] @@ -548,7 +547,7 @@ def test_multi_get_tensor(graph): if property_name != 'vertex_type': base_series = pG.get_vertex_data( types=[vertex_type], - columns=[property_name, pG.vertex_col_name] + columns=[property_name] ) vertex_ids = base_series[pG.vertex_col_name] @@ -592,7 +591,7 @@ def test_get_tensor_size(graph): if property_name != 'vertex_type': base_series = pG.get_vertex_data( types=[vertex_type], - columns=[property_name, pG.vertex_col_name] + columns=[property_name] ) vertex_ids = base_series[pG.vertex_col_name] diff --git a/python/cugraph/cugraph/tests/test_pyg_extensions.py b/python/cugraph/cugraph/tests/test_pyg_extensions.py index 99ae1e0a132..2c0859ff4f7 100644 --- a/python/cugraph/cugraph/tests/test_pyg_extensions.py +++ b/python/cugraph/cugraph/tests/test_pyg_extensions.py @@ -425,8 +425,7 @@ def test_neighbor_sample(single_vertex_graph): for node_type, node_ids in noi_groups.items(): actual_vertex_ids = pG.get_vertex_data( - types=[node_type], - columns=[pG.vertex_col_name] + types=[node_type] )[pG.vertex_col_name].to_cupy() assert list(node_ids) == list(actual_vertex_ids) @@ -483,7 +482,7 @@ def test_get_tensor(graph): if property_name != 'vertex_type': base_series = pG.get_vertex_data( types=[vertex_type], - columns=[property_name, pG.vertex_col_name] + columns=[property_name] ) vertex_ids = base_series[pG.vertex_col_name].to_cupy() @@ -510,7 +509,7 @@ def test_multi_get_tensor(graph): if property_name != 'vertex_type': base_series = pG.get_vertex_data( types=[vertex_type], - columns=[property_name, pG.vertex_col_name] + columns=[property_name] ) vertex_ids = base_series[pG.vertex_col_name].to_cupy() @@ -551,7 +550,7 @@ def test_get_tensor_size(graph): if property_name != 'vertex_type': base_series = pG.get_vertex_data( types=[vertex_type], - columns=[property_name, pG.vertex_col_name] + columns=[property_name] ) vertex_ids = base_series[pG.vertex_col_name].to_cupy() From 1feb5e88b7baf67b4b67308fe5ef1cfb99c1a08b Mon Sep 17 00:00:00 2001 From: Erik Welch Date: Fri, 23 Sep 2022 11:29:32 -0700 Subject: [PATCH 6/8] Add workaround when index dtypes are different in dask_cudf.concat --- .../dask/structure/mg_property_graph.py | 22 ++++++++++++++++--- 1 file changed, 19 insertions(+), 3 deletions(-) diff --git a/python/cugraph/cugraph/dask/structure/mg_property_graph.py b/python/cugraph/cugraph/dask/structure/mg_property_graph.py index 25327a4fed4..cff2731772f 100644 --- a/python/cugraph/cugraph/dask/structure/mg_property_graph.py +++ b/python/cugraph/cugraph/dask/structure/mg_property_graph.py @@ -256,8 +256,8 @@ def get_num_vertices(self, type=None, *, include_edge_data=True): vert_sers = self.__get_all_vertices_series() if vert_sers: if self.__series_type is dask_cudf.Series: - print([(x,x.dtype) for x in vert_sers]) - vert_count = dask_cudf.concat(vert_sers).nunique() + vert_count = dask_cudf.concat( + vert_sers, ignore_index=True).nunique() self.__num_vertices = vert_count.compute() return self.__num_vertices @@ -306,7 +306,9 @@ def get_vertices(self, selection=None): vert_sers = self.__get_all_vertices_series() if vert_sers: if self.__series_type is dask_cudf.Series: - return self.__series_type(dask_cudf.concat(vert_sers).unique()) + return self.__series_type( + dask_cudf.concat(vert_sers, ignore_index=True).unique() + ) else: raise TypeError("dataframe must be a CUDF Dask dataframe.") return self.__series_type() @@ -1051,6 +1053,20 @@ def __get_all_vertices_series(self): if epd is not None: vert_sers.append(epd[self.src_col_name]) vert_sers.append(epd[self.dst_col_name]) + # `dask_cudf.concat` doesn't work when the index dtypes are different + # See: https://github.com/rapidsai/cudf/issues/11741 + if len(vert_sers) > 1 and not all( + cudf.api.types.is_dtype_equal( + vert_sers[0].index.dtype, s.index.dtype + ) + for s in vert_sers + ): + # Cast all to int64 + first, *rest = vert_sers + dtype = first.index.dtype + for s in rest: + if s.index.dtype != dtype: + s.index = s.index.astype(dtype) return vert_sers @staticmethod From 21a3b89cdb9fdf63516931d9e59207b7a127fc39 Mon Sep 17 00:00:00 2001 From: Erik Welch Date: Fri, 23 Sep 2022 14:31:48 -0700 Subject: [PATCH 7/8] flake8 --- python/cugraph/cugraph/gnn/pyg_extensions/data/cugraph_store.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/cugraph/cugraph/gnn/pyg_extensions/data/cugraph_store.py b/python/cugraph/cugraph/gnn/pyg_extensions/data/cugraph_store.py index b87f12c0779..f8983f2829f 100644 --- a/python/cugraph/cugraph/gnn/pyg_extensions/data/cugraph_store.py +++ b/python/cugraph/cugraph/gnn/pyg_extensions/data/cugraph_store.py @@ -435,7 +435,7 @@ def neighbor_sample( noi = self.__graph.get_vertex_data( nodes_of_interest.compute().values_host if self.is_mg - else nodes_of_interest, + else nodes_of_interest, columns=[self.__graph.type_col_name] ) From aa591b88bbfe83f3fc949bcd94d612acda6e7dbe Mon Sep 17 00:00:00 2001 From: Erik Welch Date: Fri, 30 Sep 2022 10:18:37 -0700 Subject: [PATCH 8/8] Add test to ensure ids are fetch in correct order (and when repeated) --- .../tests/mg/test_mg_property_graph.py | 22 +++++++++++++ .../cugraph/tests/test_property_graph.py | 31 +++++++++++++++---- 2 files changed, 47 insertions(+), 6 deletions(-) diff --git a/python/cugraph/cugraph/tests/mg/test_mg_property_graph.py b/python/cugraph/cugraph/tests/mg/test_mg_property_graph.py index 790b14f2246..d9045b2f1c8 100644 --- a/python/cugraph/cugraph/tests/mg/test_mg_property_graph.py +++ b/python/cugraph/cugraph/tests/mg/test_mg_property_graph.py @@ -627,6 +627,28 @@ def test_get_edge_data(dataset1_MGPropertyGraph): assert_frame_equal(df1, df2, check_like=True) +def test_get_edge_data_repeated(dask_client): + from cugraph.experimental import MGPropertyGraph + + df = cudf.DataFrame( + {"src": [1, 1, 1, 2], "dst": [2, 3, 4, 1], "edge_feat": [0, 1, 2, 3]} + ) + df = dask_cudf.from_cudf(df, npartitions=2) + pg = MGPropertyGraph() + pg.add_edge_data(df, vertex_col_names=['src', 'dst']) + df1 = pg.get_edge_data(edge_ids=[2, 1, 3, 1], columns=['edge_feat']) + df1 = df1.compute() + expected = cudf.DataFrame({ + "_EDGE_ID_": [2, 1, 3, 1], + "_SRC_": [1, 1, 2, 1], + "_DST_": [4, 3, 1, 3], + "_TYPE_": ["", "", "", ""], + "edge_feat": [2, 1, 3, 1], + }) + df1["_TYPE_"] = df1["_TYPE_"].astype(str) # Undo categorical + assert_frame_equal(df1, expected) + + def test_get_data_empty_graphs(dask_client): """ Ensures that calls to pG.get_*_data() on an empty pG are handled correctly. diff --git a/python/cugraph/cugraph/tests/test_property_graph.py b/python/cugraph/cugraph/tests/test_property_graph.py index e819183c676..f08818a7610 100644 --- a/python/cugraph/cugraph/tests/test_property_graph.py +++ b/python/cugraph/cugraph/tests/test_property_graph.py @@ -531,9 +531,6 @@ def test_edges_attr(dataset2_simple_PropertyGraph): assert edge_ids.nunique() == expected_num_edges -@pytest.mark.skip(reason="This is failing in 22.10 CI with the following " - "error: TypeError: only list-like objects are " - "allowed to be passed to isin(), you passed a [int]") def test_get_vertex_data(dataset1_PropertyGraph): """ Ensure PG.get_vertex_data() returns the correct data based on vertex IDs @@ -605,9 +602,6 @@ def test_get_vertex_data(dataset1_PropertyGraph): # assert_frame_equal(df1, df2, check_like=True) -@pytest.mark.skip(reason="This is failing in 22.10 CI with the following " - "error: TypeError: only list-like objects are " - "allowed to be passed to isin(), you passed a [int]") def test_get_edge_data(dataset1_PropertyGraph): """ Ensure PG.get_edge_data() returns the correct data based on edge IDs passed @@ -676,6 +670,31 @@ def test_get_edge_data(dataset1_PropertyGraph): # assert_frame_equal(df1, df2, check_like=True) +@pytest.mark.parametrize("df_type", df_types, ids=df_type_id) +def test_get_edge_data_repeated(df_type): + from cugraph.experimental import PropertyGraph + + df = df_type( + {"src": [1, 1, 1, 2], "dst": [2, 3, 4, 1], "edge_feat": np.arange(4)} + ) + pg = PropertyGraph() + pg.add_edge_data(df, vertex_col_names=['src', 'dst']) + df1 = pg.get_edge_data(edge_ids=[2, 1, 3, 1], columns=['edge_feat']) + expected = df_type({ + "_EDGE_ID_": [2, 1, 3, 1], + "_SRC_": [1, 1, 2, 1], + "_DST_": [4, 3, 1, 3], + "_TYPE_": ["", "", "", ""], + "edge_feat": [2, 1, 3, 1], + }) + df1["_TYPE_"] = df1["_TYPE_"].astype(str) # Undo categorical + if df_type is cudf.DataFrame: + afe = assert_frame_equal + else: + afe = pd.testing.assert_frame_equal + afe(df1, expected) + + @pytest.mark.parametrize("df_type", df_types, ids=df_type_id) def test_null_data(df_type): """