Skip to content

Commit

Permalink
Undirected graph support for MG graphs (#2247)
Browse files Browse the repository at this point in the history
This PR allows our MG implementation to support undirected graph by symmetrizing the dataframe. This will be consistent with the SG implementation.

It also 
- replaces the deprecated `cudf` call `append` by `concat`
- update Neighborhood sampling's docstring and code to only support `int32` vertices and labels by Dylan

closes #2240
closes #2174 
closes #2264 by raising a `ValueError` and updating the docstring

Authors:
  - Joseph Nke (https://github.com/jnke2016)
  - https://github.com/betochimas

Approvers:
  - Rick Ratzel (https://github.com/rlratzel)

URL: #2247
  • Loading branch information
jnke2016 authored May 11, 2022
1 parent 403797f commit e906c98
Show file tree
Hide file tree
Showing 8 changed files with 197 additions and 155 deletions.
6 changes: 0 additions & 6 deletions python/cugraph/cugraph/dask/link_analysis/hits.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,12 +170,6 @@ def hits(input_graph, tol=1.0e-5, max_iter=100, nstart=None, normalized=True):
src_col_name = input_graph.renumber_map.renumbered_src_col_name
dst_col_name = input_graph.renumber_map.renumbered_dst_col_name

# FIXME Move this call to the function creating a directed
# graph from a dask dataframe because duplicated edges need
# to be dropped
ddf = ddf.map_partitions(
lambda df: df.drop_duplicates(subset=[src_col_name, dst_col_name]))

num_edges = len(ddf)
data = get_distributed_data(ddf)

Expand Down
10 changes: 8 additions & 2 deletions python/cugraph/cugraph/dask/sampling/neighborhood_sampling.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,12 +99,12 @@ def EXPERIMENTAL__uniform_neighborhood(input_graph,
cuGraph graph, which contains connectivity information as dask cudf
edge list dataframe
start_info_list : tuple of list or cudf.Series
start_info_list : tuple of list or cudf.Series (int32)
Tuple of a list of starting vertices for sampling, along with a
corresponding list of label for reorganizing results after sending
the input to different callers.
fanout_vals : list
fanout_vals : list (int32)
List of branching out (fan-out) degrees per starting vertex for each
hop level.
Expand Down Expand Up @@ -135,8 +135,14 @@ def EXPERIMENTAL__uniform_neighborhood(input_graph,

if isinstance(start_list, list):
start_list = cudf.Series(start_list)
if start_list.dtype != 'int32':
raise ValueError(f"'start_list' must have int32 values, "
f"got: {start_list.dtype}")
if isinstance(info_list, list):
info_list = cudf.Series(info_list)
if info_list.dtype != 'int32':
raise ValueError(f"'info_list' must have int32 values, "
f"got: {info_list.dtype}")
# fanout_vals must be a host array!
# FIXME: ensure other sequence types (eg. cudf Series) can be handled.
if isinstance(fanout_vals, list):
Expand Down
13 changes: 10 additions & 3 deletions python/cugraph/cugraph/structure/graph_classes.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
simpleDistributedGraphImpl,
npartiteGraphImpl)
import cudf
import dask_cudf
import warnings

from cugraph.utilities.utils import import_optional
Expand All @@ -24,9 +25,15 @@


# TODO: Move to utilities
def null_check(col):
if col.null_count != 0:
raise ValueError("Series contains NULL values")
def null_check(input_data):
# input_data can be cudf.Series, cudf.DataFrame, dask_cudf.Series
# and dask_cudf.DataFrame
has_null = input_data.isna().values.any()
if isinstance(input_data, (dask_cudf.Series, dask_cudf.DataFrame)):
has_null = has_null.compute()

if has_null:
raise ValueError("Series/DataFrame contains NULL values")


class Graph:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from cugraph.structure import graph_primtypes_wrapper
from cugraph.structure.graph_primtypes_wrapper import Direction
from cugraph.structure.number_map import NumberMap
from cugraph.structure.symmetrize import symmetrize
import cudf
import dask_cudf

Expand Down Expand Up @@ -81,17 +82,41 @@ def __from_edgelist(
"and destination parameters"
)
ddf_columns = s_col + d_col

# The dataframe will be symmetrized iff the graph is undirected
# otherwise, the inital dataframe will be returned
if edge_attr is not None:
if not (set([edge_attr]).issubset(set(input_ddf.columns))):
raise ValueError(
"edge_attr column name not found in input."
"Recheck the edge_attr parameter")
self.properties.weighted = True
ddf_columns = ddf_columns + [edge_attr]
input_ddf = input_ddf[ddf_columns]
input_ddf = input_ddf.rename(columns={edge_attr: 'value'})
source_col, dest_col, value_col = symmetrize(
input_ddf, source, destination, 'value',
multi=self.properties.multi_edge,
symmetrize=not self.properties.directed)
else:
input_ddf = input_ddf[ddf_columns]
source_col, dest_col = symmetrize(
input_ddf, source, destination,
multi=self.properties.multi_edge,
symmetrize=not self.properties.directed)

if isinstance(source_col, dask_cudf.Series):
# Create a dask_cudf dataframe from the cudf series obtained
# from symmetrization
input_ddf = source_col.to_frame()
input_ddf = input_ddf.rename(columns={source_col.name: source})
input_ddf[destination] = dest_col
else:
# Multi column dask_cudf dataframe
input_ddf = dask_cudf.concat([source_col, dest_col], axis=1)

if edge_attr is not None:
input_ddf = input_ddf.rename(columns={edge_attr: 'value'})
input_ddf['value'] = value_col

self.input_df = input_ddf

#
# Keep all of the original parameters so we can lazily
Expand All @@ -100,7 +125,6 @@ def __from_edgelist(

# FIXME: Edge Attribute not handled
self.properties.renumbered = renumber
self.input_df = input_ddf
self.source_columns = source
self.destination_columns = destination

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,17 @@ def __from_edgelist(
"names not found in input. Recheck the source and "
"destination parameters"
)
df_columns = s_col + d_col

if edge_attr is not None:
if not (set([edge_attr]).issubset(set(input_df.columns))):
raise ValueError(
"edge_attr column name not found in input."
"Recheck the edge_attr parameter")
self.properties.weighted = True
df_columns.append(edge_attr)

input_df = input_df[df_columns]
# FIXME: check if the consolidated graph fits on the
# device before gathering all the edge lists

Expand Down Expand Up @@ -146,20 +156,13 @@ def __from_edgelist(
if type(source) is list and type(destination) is list:
raise ValueError("set renumber to True for multi column ids")

# Populate graph edgelist
source_col = elist[source]
dest_col = elist[destination]

if edge_attr is not None:
self.properties.weighted = True
value_col = elist[edge_attr]
else:
value_col = None

# The dataframe will be symmetrized iff the graph is undirected
# otherwise the inital dataframe will be returned. Duplicated edges
# will be dropped unless the graph is a MultiGraph(Not Implemented yet)
# TODO: Update Symmetrize to work on Graph and/or DataFrame
if value_col is not None:
if edge_attr is not None:
source_col, dest_col, value_col = symmetrize(
source_col, dest_col, value_col,
elist, source, destination, edge_attr,
multi=self.properties.multi_edge,
symmetrize=not self.properties.directed)
if isinstance(value_col, cudf.DataFrame):
Expand All @@ -168,8 +171,9 @@ def __from_edgelist(
value_dict[i] = value_col[i]
value_col = value_dict
else:
value_col = None
source_col, dest_col = symmetrize(
source_col, dest_col, multi=self.properties.multi_edge,
elist, source, destination, multi=self.properties.multi_edge,
symmetrize=not self.properties.directed)

self.edgelist = simpleGraphImpl.EdgeList(source_col, dest_col,
Expand Down Expand Up @@ -754,10 +758,10 @@ def to_undirected(self, G):
df = self.edgelist.edgelist_df
if self.edgelist.weights:
source_col, dest_col, value_col = symmetrize(
df["src"], df["dst"], df["weights"]
df, 'src', 'dst', 'weights'
)
else:
source_col, dest_col = symmetrize(df["src"], df["dst"])
source_col, dest_col = symmetrize(df, 'src', "dst")
value_col = None
G.edgelist = simpleGraphImpl.EdgeList(source_col, dest_col,
value_col)
Expand Down
25 changes: 12 additions & 13 deletions python/cugraph/cugraph/structure/number_map.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ def add_internal_vertex_id(self, df, id_column_name, col_names,
return ret

def indirection_map(self, df, src_col_names, dst_col_names):
# src_col_names and dst_col_names are lists
tmp_df = cudf.DataFrame()

tmp = (
Expand All @@ -155,11 +156,13 @@ def indirection_map(self, df, src_col_names, dst_col_names):
.count()
.reset_index()
)
for newname, oldname in zip(self.col_names, dst_col_names):
tmp_df[newname] = tmp[newname].append(tmp_dst[oldname])
# Need to have the same column names before both df can be
# concat
tmp_dst.columns = tmp.columns
tmp_df = cudf.concat([tmp, tmp_dst])
else:
for newname in self.col_names:
tmp_df[newname] = tmp[newname]
newname = self.col_names
tmp_df = tmp[newname]

tmp_df = tmp_df.groupby(self.col_names).count().reset_index()
tmp_df["id"] = tmp_df.index.astype(self.id_type)
Expand Down Expand Up @@ -266,16 +269,12 @@ def indirection_map(self, ddf, src_col_names, dst_col_names):
.count()
.reset_index()
)
for i, (newname, oldname) in enumerate(zip(self.col_names,
dst_col_names)):
if i == 0:
tmp_df = tmp[newname].append(tmp_dst[oldname]).\
to_frame(name=newname)
else:
tmp_df[newname] = tmp[newname].append(tmp_dst[oldname])
tmp_dst.columns = tmp.columns
tmp_df = dask_cudf.concat([tmp, tmp_dst])

else:
for newname in self.col_names:
tmp_df[newname] = tmp[newname]
newname = self.col_names
tmp_df = tmp[newname]
tmp_ddf = tmp_df.groupby(self.col_names).count().reset_index()

# Set global index
Expand Down
Loading

0 comments on commit e906c98

Please sign in to comment.