Skip to content

Commit

Permalink
Update python WCC to leverage the CAPI (#2866)
Browse files Browse the repository at this point in the history
The CAPI has a `weakly connected component` version that can be leverage by the PLC and python API. This PR:
1. Add a PLC implementation of `WCC`
2. Update the python API to leverage the `PLC` API
3. Remove the legacy implementation
4. Update both the python and the `PLC` tests
5. Add `PLC` API to to create a graph from a `CSR `representation
6. Update docstrings 

closes #2509 
closes #2496

Authors:
  - Joseph Nke (https://github.com/jnke2016)

Approvers:
  - Chuck Hastings (https://github.com/ChuckHastings)
  - Rick Ratzel (https://github.com/rlratzel)

URL: #2866
  • Loading branch information
jnke2016 authored Nov 14, 2022
1 parent 3803cf8 commit 1f346de
Show file tree
Hide file tree
Showing 19 changed files with 655 additions and 527 deletions.
1 change: 0 additions & 1 deletion python/cugraph/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,6 @@ add_subdirectory(cugraph/components)
add_subdirectory(cugraph/cores)
add_subdirectory(cugraph/dask/centrality)
add_subdirectory(cugraph/dask/comms)
add_subdirectory(cugraph/dask/components)
add_subdirectory(cugraph/dask/structure)
add_subdirectory(cugraph/generators)
add_subdirectory(cugraph/internals)
Expand Down
10 changes: 1 addition & 9 deletions python/cugraph/cugraph/components/connectivity.pxd
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2019-2021, NVIDIA CORPORATION.
# Copyright (c) 2019-2022, NVIDIA CORPORATION.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
Expand All @@ -23,17 +23,9 @@ from cugraph.structure.graph_utilities cimport *
cdef extern from "cugraph/algorithms.hpp" namespace "cugraph":

ctypedef enum cugraph_cc_t:
CUGRAPH_WEAK "cugraph::cugraph_cc_t::CUGRAPH_WEAK"
CUGRAPH_STRONG "cugraph::cugraph_cc_t::CUGRAPH_STRONG"
NUM_CONNECTIVITY_TYPES "cugraph::cugraph_cc_t::NUM_CONNECTIVITY_TYPES"

cdef void connected_components[VT,ET,WT](
const GraphCSRView[VT,ET,WT] &graph,
cugraph_cc_t connect_type,
VT *labels) except +

cdef extern from "cugraph/utilities/cython.hpp" namespace "cugraph::cython":
cdef void call_wcc[vertex_t, weight_t](
const handle_t &handle,
const graph_container_t &g,
vertex_t *identifiers) except +
23 changes: 21 additions & 2 deletions python/cugraph/cugraph/components/connectivity.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@
)
from cugraph.structure import Graph, DiGraph
from cugraph.components import connectivity_wrapper
import cudf
from pylibcugraph import weakly_connected_components as pylibcugraph_wcc
from pylibcugraph import ResourceHandle


def _ensure_args(api_name, G, directed, connection, return_labels):
Expand Down Expand Up @@ -112,7 +115,7 @@ def weakly_connected_components(G, directed=None, connection=None, return_labels
The adjacency list will be computed if not already present. The number
of vertices should fit into a 32b int.
directed : bool, optional (default=True)
directed : bool, optional (default=None)
NOTE
For non-Graph-type (eg. sparse matrix) values of G only.
Expand Down Expand Up @@ -171,6 +174,7 @@ def weakly_connected_components(G, directed=None, connection=None, return_labels
>>> df = cugraph.weakly_connected_components(G)
"""

(directed, connection, return_labels) = _ensure_args(
"weakly_connected_components", G, directed, connection, return_labels
)
Expand All @@ -180,7 +184,22 @@ def weakly_connected_components(G, directed=None, connection=None, return_labels
G, nx_weight_attr="weight", matrix_graph_type=Graph(directed=directed)
)

df = connectivity_wrapper.weakly_connected_components(G)
if G.is_directed():
raise ValueError("input graph must be undirected")

vertex, labels = pylibcugraph_wcc(
resource_handle=ResourceHandle(),
graph=G._plc_graph,
offsets=None,
indices=None,
weights=None,
labels=None,
do_expensive_check=False,
)

df = cudf.DataFrame()
df["vertex"] = vertex
df["labels"] = labels

if G.renumbered:
df = G.unrenumber(df, "vertex")
Expand Down
63 changes: 1 addition & 62 deletions python/cugraph/cugraph/components/connectivity_wrapper.pyx
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2019-2021, NVIDIA CORPORATION.
# Copyright (c) 2019-2022, NVIDIA CORPORATION.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
Expand Down Expand Up @@ -27,67 +27,6 @@ from cugraph.structure.graph_classes import Graph as type_Graph
import cudf
import numpy as np

def weakly_connected_components(input_graph):
"""
Call connected_components
"""

cdef unique_ptr[handle_t] handle_ptr
handle_ptr.reset(new handle_t())
handle_ = handle_ptr.get()

numberTypeMap = {np.dtype("int32") : <int>numberTypeEnum.int32Type,
np.dtype("int64") : <int>numberTypeEnum.int64Type,
np.dtype("float32") : <int>numberTypeEnum.floatType,
np.dtype("double") : <int>numberTypeEnum.doubleType}

[src, dst] = graph_primtypes_wrapper.datatype_cast([input_graph.edgelist.edgelist_df['src'],
input_graph.edgelist.edgelist_df['dst']],
[np.int32])
if type(input_graph) is not type_Graph:
#
# Need to create a symmetrized COO for this local
# computation

src, dst = symmetrize(src, dst)
weight_t = np.dtype("float32")
weights = None

num_verts = input_graph.number_of_vertices()
num_edges = input_graph.number_of_edges(directed_edges=True)

df = cudf.DataFrame()
df['vertex'] = cudf.Series(np.arange(num_verts, dtype=np.int32))
df['labels'] = cudf.Series(np.zeros(num_verts, dtype=np.int32))

cdef uintptr_t c_src_vertices = src.__cuda_array_interface__['data'][0]
cdef uintptr_t c_dst_vertices = dst.__cuda_array_interface__['data'][0]
cdef uintptr_t c_edge_weights = <uintptr_t>NULL
cdef uintptr_t c_labels_val = df['labels'].__cuda_array_interface__['data'][0];

cdef graph_container_t graph_container
populate_graph_container(graph_container,
handle_[0],
<void*>c_src_vertices, <void*>c_dst_vertices, <void*>c_edge_weights,
<void*>NULL,
<void*>NULL,
0,
<numberTypeEnum>(<int>(numberTypeEnum.int32Type)),
<numberTypeEnum>(<int>(numberTypeEnum.int32Type)),
<numberTypeEnum>(<int>(numberTypeMap[weight_t])),
num_edges,
num_verts, num_edges,
False,
True,
False,
False)

call_wcc[int, float](handle_ptr.get()[0],
graph_container,
<int*> c_labels_val)

return df


def strongly_connected_components(input_graph):
"""
Expand Down
125 changes: 66 additions & 59 deletions python/cugraph/cugraph/dask/components/connectivity.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
# Copyright (c) 2021-2022, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
Expand All @@ -10,43 +11,38 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

from dask.distributed import wait, default_client
from cugraph.dask.common.input_utils import (
get_distributed_data,
get_vertex_partition_offsets,
)
from cugraph.dask.components import mg_connectivity_wrapper as mg_connectivity
from dask.distributed import wait
import cugraph.dask.comms.comms as Comms
import dask_cudf
import cudf

from pylibcugraph import ResourceHandle
from pylibcugraph import weakly_connected_components as pylibcugraph_wcc

def call_wcc(
sID,
data,
src_col_name,
dst_col_name,
num_verts,
num_edges,
vertex_partition_offsets,
aggregate_segment_offsets,
):
wid = Comms.get_worker_id(sID)
handle = Comms.get_handle(sID)
local_size = len(aggregate_segment_offsets) // Comms.get_n_workers(sID)
segment_offsets = aggregate_segment_offsets[
local_size * wid : local_size * (wid + 1)
]
return mg_connectivity.mg_wcc(
data[0],
src_col_name,
dst_col_name,
num_verts,
num_edges,
vertex_partition_offsets,
wid,
handle,
segment_offsets,

def convert_to_cudf(cp_arrays):
"""
Creates a cudf DataFrame from cupy arrays from pylibcugraph wrapper
"""
cupy_vertex, cupy_labels = cp_arrays
df = cudf.DataFrame()
df["vertex"] = cupy_vertex
df["labels"] = cupy_labels

return df


def _call_plc_wcc(sID, mg_graph_x, do_expensive_check):
return pylibcugraph_wcc(
resource_handle=ResourceHandle(Comms.get_handle(sID).getHandle()),
graph=mg_graph_x,
offsets=None,
indices=None,
weights=None,
labels=None,
do_expensive_check=do_expensive_check,
)


Expand All @@ -57,10 +53,21 @@ def weakly_connected_components(input_graph):
Parameters
----------
input_graph : cugraph.Graph, networkx.Graph, CuPy or SciPy sparse matrix
Graph or matrix object, which should contain the connectivity
information
input_graph : cugraph.Graph
The graph descriptor should contain the connectivity information
and weights. The adjacency list will be computed if not already
present.
The current implementation only supports undirected graphs.
Returns
-------
result : dask_cudf.DataFrame
GPU distributed data frame containing 2 dask_cudf.Series
ddf['vertex']: dask_cudf.Series
Contains the vertex identifiers
ddf['labels']: dask_cudf.Series
Contains the wcc labels
Examples
--------
Expand All @@ -74,45 +81,45 @@ def weakly_connected_components(input_graph):
... chunksize=chunksize, delimiter=" ",
... names=["src", "dst", "value"],
... dtype=["int32", "int32", "float32"])
>>> dg = cugraph.Graph(directed=True)
>>> dg = cugraph.Graph(directed=False)
>>> dg.from_dask_cudf_edgelist(ddf, source='src', destination='dst',
... edge_attr='value')
>>> result = dcg.weakly_connected_components(dg)
"""

client = default_client()

input_graph.compute_renumber_edge_list()
if input_graph.is_directed():
raise ValueError("input graph must be undirected")

ddf = input_graph.edgelist.edgelist_df
vertex_partition_offsets = get_vertex_partition_offsets(input_graph)
num_verts = vertex_partition_offsets.iloc[-1]
num_edges = len(ddf)
data = get_distributed_data(ddf)
# Initialize dask client
client = input_graph._client

src_col_name = input_graph.renumber_map.renumbered_src_col_name
dst_col_name = input_graph.renumber_map.renumbered_dst_col_name
do_expensive_check = False

result = [
client.submit(
call_wcc,
_call_plc_wcc,
Comms.get_session_id(),
wf[1],
src_col_name,
dst_col_name,
num_verts,
num_edges,
vertex_partition_offsets,
input_graph.aggregate_segment_offsets,
workers=[wf[0]],
input_graph._plc_graph[w],
do_expensive_check,
workers=[w],
allow_other_workers=False,
)
for idx, wf in enumerate(data.worker_to_parts.items())
for w in Comms.get_workers()
]

wait(result)
ddf = dask_cudf.from_delayed(result)

cudf_result = [client.submit(convert_to_cudf, cp_arrays) for cp_arrays in result]

wait(cudf_result)

ddf = dask_cudf.from_delayed(cudf_result).persist()
wait(ddf)
# Wait until the inactive futures are released
wait([(r.release(), c_r.release()) for r, c_r in zip(result, cudf_result)])

if input_graph.renumbered:
return input_graph.unrenumber(ddf, "vertex")
ddf = input_graph.unrenumber(ddf, "vertex")

return ddf
26 changes: 0 additions & 26 deletions python/cugraph/cugraph/dask/components/mg_connectivity.pxd

This file was deleted.

Loading

0 comments on commit 1f346de

Please sign in to comment.