Skip to content

Commit

Permalink
Migrate SG and MG BFS to pylibcugraph (#2284)
Browse files Browse the repository at this point in the history
  • Loading branch information
alexbarghi-nv authored May 23, 2022
1 parent 1693a12 commit 5f72b19
Show file tree
Hide file tree
Showing 10 changed files with 380 additions and 437 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -88,3 +88,4 @@ python/cugraph/cugraph/tests/dask-worker-space

# Sphinx docs & build artifacts
docs/cugraph/source/api_docs/api/*

226 changes: 121 additions & 105 deletions python/cugraph/cugraph/dask/traversal/bfs.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,45 +13,75 @@
# limitations under the License.
#

from collections.abc import Iterable
from pylibcugraph.experimental import (MGGraph,
ResourceHandle,
GraphProperties,
)
from pylibcugraph import bfs as pylibcugraph_bfs

from dask.distributed import wait, default_client
from cugraph.dask.common.input_utils import (get_distributed_data,
get_vertex_partition_offsets)
from cugraph.dask.traversal import mg_bfs_wrapper as mg_bfs
from cugraph.dask.common.input_utils import get_distributed_data
import cugraph.dask.comms.comms as Comms
import cudf
import dask_cudf


def call_bfs(sID,
data,
src_col_name,
dst_col_name,
num_verts,
num_edges,
vertex_partition_offsets,
aggregate_segment_offsets,
start,
depth_limit,
return_distances):
wid = Comms.get_worker_id(sID)
handle = Comms.get_handle(sID)
local_size = len(aggregate_segment_offsets) // Comms.get_n_workers(sID)
segment_offsets = \
aggregate_segment_offsets[local_size * wid: local_size * (wid + 1)]
return mg_bfs.mg_bfs(data[0],
src_col_name,
dst_col_name,
num_verts,
num_edges,
vertex_partition_offsets,
wid,
handle,
segment_offsets,
start,
depth_limit,
return_distances)
def _call_plc_mg_bfs(
sID,
data,
sources,
depth_limit,
src_col_name,
dst_col_name,
graph_properties,
num_edges,
direction_optimizing=False,
do_expensive_check=False,
return_predecessors=True):
comms_handle = Comms.get_handle(sID)
resource_handle = ResourceHandle(comms_handle.getHandle())

srcs = cudf.Series(data[0][src_col_name], dtype='int32')
dsts = cudf.Series(data[0][dst_col_name], dtype='int32')
weights = cudf.Series(data[0]['value'], dtype='float32') \
if 'value' in data[0].columns \
else cudf.Series((srcs + 1) / (srcs + 1), dtype='float32')

mg = MGGraph(
resource_handle=resource_handle,
graph_properties=graph_properties,
src_array=srcs,
dst_array=dsts,
weight_array=weights,
store_transposed=False,
num_edges=num_edges,
do_expensive_check=do_expensive_check
)

res = \
pylibcugraph_bfs(
resource_handle,
mg,
cudf.Series(sources, dtype='int32'),
direction_optimizing,
depth_limit if depth_limit is not None else 0,
return_predecessors,
True
)

return res


def convert_to_cudf(cp_arrays):
"""
create a cudf DataFrame from cupy arrays
"""
cupy_distances, cupy_predecessors, cupy_vertices = cp_arrays
df = cudf.DataFrame()
df["vertex"] = cupy_vertices
df["distance"] = cupy_distances
df["predecessor"] = cupy_predecessors
return df


def bfs(input_graph,
Expand Down Expand Up @@ -115,95 +145,81 @@ def bfs(input_graph,

input_graph.compute_renumber_edge_list(transposed=False)
ddf = input_graph.edgelist.edgelist_df
vertex_partition_offsets = get_vertex_partition_offsets(input_graph)
num_verts = vertex_partition_offsets.iloc[-1]

graph_properties = GraphProperties(
is_multigraph=False)

num_edges = len(ddf)
data = get_distributed_data(ddf)

src_col_name = input_graph.renumber_map.renumbered_src_col_name
dst_col_name = input_graph.renumber_map.renumbered_dst_col_name
renumber_ddf = input_graph.renumber_map.implementation.ddf
col_names = input_graph.renumber_map.implementation.col_names

if isinstance(start, dask_cudf.DataFrame) \
or isinstance(start, cudf.DataFrame):
tmp_df = start
tmp_col_names = start.columns
else:
tmp_df = cudf.DataFrame()
tmp_df["0"] = cudf.Series(start)
tmp_col_names = ["0"]

original_start_len = len(tmp_df)

tmp_ddf = tmp_df[tmp_col_names].rename(
columns=dict(zip(tmp_col_names, col_names)))
for name in col_names:
tmp_ddf[name] = tmp_ddf[name].astype(renumber_ddf[name].dtype)
renumber_data = get_distributed_data(renumber_ddf)

def df_merge(df, tmp_df, tmp_col_names):
x = df[0].merge(tmp_df, on=tmp_col_names, how='inner')
return x['global_id']

if input_graph.renumbered:
src_col_name = input_graph.renumber_map.renumbered_src_col_name
dst_col_name = input_graph.renumber_map.renumbered_dst_col_name
renumber_ddf = input_graph.renumber_map.implementation.ddf
col_names = input_graph.renumber_map.implementation.col_names
if isinstance(start,
dask_cudf.DataFrame) or isinstance(start,
cudf.DataFrame):
tmp_df = start
tmp_col_names = start.columns
else:
tmp_df = cudf.DataFrame()
tmp_df["0"] = cudf.Series(start)
tmp_col_names = ["0"]

original_start_len = len(tmp_df)

tmp_ddf = tmp_df[tmp_col_names].rename(
columns=dict(zip(tmp_col_names, col_names)))
for name in col_names:
tmp_ddf[name] = tmp_ddf[name].astype(renumber_ddf[name].dtype)
renumber_data = get_distributed_data(renumber_ddf)
start = [client.submit(df_merge,
wf[1],
tmp_ddf,
col_names,
workers=[wf[0]])
for idx, wf in enumerate(renumber_data.worker_to_parts.items()
)
]

def count_src(df):
return len(df)

count_src_results = client.map(count_src, start)
cg = client.gather(count_src_results)
if sum(cg) < original_start_len:
raise ValueError('At least one start vertex provided was invalid')

else:
# If the input graph was created with renumbering disabled (Graph(...,
# renumber=False), the above compute_renumber_edge_list() call will not
# perform a renumber step and the renumber_map will not have src/dst
# col names. In that case, the src/dst values specified when reading
# the edgelist dataframe are to be used, but only if they were single
# string values (ie. not a list representing multi-columns).
if isinstance(input_graph.source_columns, Iterable):
raise RuntimeError("input_graph was not renumbered but has a "
"non-string source column name (got: "
f"{input_graph.source_columns}). Re-create "
"input_graph with either renumbering enabled "
"or a source column specified as a string.")
if isinstance(input_graph.destination_columns, Iterable):
raise RuntimeError("input_graph was not renumbered but has a "
"non-string destination column name (got: "
f"{input_graph.destination_columns}). "
"Re-create input_graph with either renumbering "
"enabled or a destination column specified as "
"a string.")
src_col_name = input_graph.source_columns
dst_col_name = input_graph.destination_columns

result = [client.submit(
call_bfs,
start = [
client.submit(
df_merge,
wf[1],
tmp_ddf,
col_names,
workers=[wf[0]]
)
for idx, wf in enumerate(renumber_data.worker_to_parts.items())
]

def count_src(df):
return len(df)

count_src_results = client.map(count_src, start)
cg = client.gather(count_src_results)
if sum(cg) < original_start_len:
raise ValueError('At least one start vertex provided was invalid')

cupy_result = [client.submit(
_call_plc_mg_bfs,
Comms.get_session_id(),
wf[1],
start[idx],
depth_limit,
src_col_name,
dst_col_name,
num_verts,
graph_properties,
num_edges,
vertex_partition_offsets,
input_graph.aggregate_segment_offsets,
start[idx],
depth_limit,
False,
True,
return_distances,
workers=[wf[0]])
for idx, wf in enumerate(data.worker_to_parts.items())]
wait(result)
ddf = dask_cudf.from_delayed(result)
wait(cupy_result)

cudf_result = [client.submit(convert_to_cudf,
cp_arrays)
for cp_arrays in cupy_result]
wait(cudf_result)

ddf = dask_cudf.from_delayed(cudf_result)

if input_graph.renumbered:
ddf = input_graph.unrenumber(ddf, 'vertex')
Expand Down
35 changes: 0 additions & 35 deletions python/cugraph/cugraph/dask/traversal/mg_bfs.pxd

This file was deleted.

Loading

0 comments on commit 5f72b19

Please sign in to comment.