Skip to content

Commit

Permalink
Add MG neighborhood sampling to pylibcugraph & cugraph APIs (#2118)
Browse files Browse the repository at this point in the history
Closes #2108 when merged. Requires both #2088 and #2156 to be merged before, the former because this uses MGGraph, and the later because of the C implementation of neighborhood sampling.

Authors:
  - https://github.com/betochimas
  - Joseph Nke (https://github.com/jnke2016)
  - Rick Ratzel (https://github.com/rlratzel)

Approvers:
  - Don Acosta (https://github.com/acostadon)
  - Rick Ratzel (https://github.com/rlratzel)
  - Joseph Nke (https://github.com/jnke2016)
  - Chuck Hastings (https://github.com/ChuckHastings)
  - Jordan Jacobelli (https://github.com/Ethyling)

URL: #2118
  • Loading branch information
betochimas authored Mar 31, 2022
1 parent 87a170a commit 38be932
Show file tree
Hide file tree
Showing 17 changed files with 765 additions and 31 deletions.
6 changes: 3 additions & 3 deletions ci/test.sh
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#!/bin/bash
# Copyright (c) 2019-2021, NVIDIA CORPORATION.
# Copyright (c) 2019-2022, NVIDIA CORPORATION.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
Expand Down Expand Up @@ -96,9 +96,9 @@ cd ${CUGRAPH_ROOT}/python/pylibcugraph/pylibcugraph
pytest --cache-clear --junitxml=${CUGRAPH_ROOT}/junit-pylibcugraph-pytests.xml -v --cov-config=.coveragerc --cov=pylibcugraph --cov-report=xml:${WORKSPACE}/python/pylibcugraph/pylibcugraph-coverage.xml --cov-report term --ignore=raft --benchmark-disable
echo "Ran Python pytest for pylibcugraph : return code was: $?, test script exit code is now: $EXITCODE"

echo "Python pytest for cuGraph..."
echo "Python pytest for cuGraph (single-GPU only)..."
cd ${CUGRAPH_ROOT}/python/cugraph/cugraph
pytest --cache-clear --junitxml=${CUGRAPH_ROOT}/junit-cugraph-pytests.xml -v --cov-config=.coveragerc --cov=cugraph --cov-report=xml:${WORKSPACE}/python/cugraph/cugraph-coverage.xml --cov-report term --ignore=raft --benchmark-disable
pytest --cache-clear --junitxml=${CUGRAPH_ROOT}/junit-cugraph-pytests.xml -v --cov-config=.coveragerc --cov=cugraph --cov-report=xml:${WORKSPACE}/python/cugraph/cugraph-coverage.xml --cov-report term --ignore=raft --ignore=tests/dask --benchmark-disable
echo "Ran Python pytest for cugraph : return code was: $?, test script exit code is now: $EXITCODE"

echo "Python benchmarks for cuGraph (running as tests)..."
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/c_api/uniform_neighbor_sampling.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ struct uniform_neighbor_sampling_functor : public cugraph::c_api::abstract_funct

result_ = new cugraph::c_api::cugraph_sample_result_t{
new cugraph::c_api::cugraph_type_erased_device_array_t(srcs, graph_->vertex_type_),
new cugraph::c_api::cugraph_type_erased_device_array_t(dsts, graph_->weight_type_),
new cugraph::c_api::cugraph_type_erased_device_array_t(dsts, graph_->vertex_type_),
new cugraph::c_api::cugraph_type_erased_device_array_t(labels, start_label_->type_),
new cugraph::c_api::cugraph_type_erased_device_array_t(indices, graph_->edge_type_),
new cugraph::c_api::cugraph_type_erased_host_array_t(counts, graph_->vertex_type_)};
Expand Down
11 changes: 11 additions & 0 deletions datasets/small_tree.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
0 1 1.0
0 2 1.0
0 3 1.0
0 4 1.0
1 5 1.0
2 5 1.0
3 5 1.0
4 5 1.0
5 6 1.0
5 7 1.0
5 8 1.0
12 changes: 12 additions & 0 deletions python/cugraph/cugraph/dask/sampling/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
# Copyright (c) 2022, NVIDIA CORPORATION.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
187 changes: 187 additions & 0 deletions python/cugraph/cugraph/dask/sampling/neighborhood_sampling.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,187 @@
# Copyright (c) 2022, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import numpy
from dask.distributed import wait, default_client

import dask_cudf
import cudf

from pylibcugraph.experimental import (MGGraph,
ResourceHandle,
GraphProperties,
uniform_neighborhood_sampling,
)
from cugraph.dask.common.input_utils import get_distributed_data
from cugraph.comms import comms as Comms


def call_nbr_sampling(sID,
data,
src_col_name,
dst_col_name,
num_edges,
do_expensive_check,
start_list,
info_list,
h_fan_out,
with_replacement):

# Preparation for graph creation
handle = Comms.get_handle(sID)
handle = ResourceHandle(handle.getHandle())
graph_properties = GraphProperties(is_symmetric=False, is_multigraph=False)
srcs = data[0][src_col_name]
dsts = data[0][dst_col_name]
weights = None
if "value" in data[0].columns:
weights = data[0]['value']

store_transposed = False

mg = MGGraph(handle,
graph_properties,
srcs,
dsts,
weights,
store_transposed,
num_edges,
do_expensive_check)

ret_val = uniform_neighborhood_sampling(handle,
mg,
start_list,
info_list,
h_fan_out,
with_replacement,
do_expensive_check)
return ret_val


def convert_to_cudf(cp_arrays):
"""
Creates a cudf DataFrame from cupy arrays from pylibcugraph wrapper
"""
cupy_sources, cupy_destinations, cupy_labels, cupy_indices = cp_arrays
# cupy_sources, cupy_destinations, cupy_labels, cupy_indices,
# cupy_counts = cp_arrays
df = cudf.DataFrame()
df["sources"] = cupy_sources
df["destinations"] = cupy_destinations
df["labels"] = cupy_labels
df["indices"] = cupy_indices
# df["counts"] = cupy_counts
return df


def EXPERIMENTAL__uniform_neighborhood(input_graph,
start_info_list,
fanout_vals,
with_replacement=True):
"""
Does neighborhood sampling, which samples nodes from a graph based on the
current node's neighbors, with a corresponding fanout value at each hop.
Parameters
----------
input_graph : cugraph.DiGraph
cuGraph graph, which contains connectivity information as dask cudf
edge list dataframe
start_info_list : tuple of list or cudf.Series
Tuple of a list of starting vertices for sampling, along with a
corresponding list of label for reorganizing results after sending
the input to different callers.
fanout_vals : list
List of branching out (fan-out) degrees per starting vertex for each
hop level.
with_replacement: bool, optional (default=True)
Flag to specify if the random sampling is done with replacement
Returns
-------
result : dask_cudf.DataFrame
GPU data frame containing two dask_cudf.Series
ddf['sources']: dask_cudf.Series
Contains the source vertices from the sampling result
ddf['destinations']: dask_cudf.Series
Contains the destination vertices from the sampling result
ddf['labels']: dask_cudf.Series
Contains the start labels from the sampling result
ddf['indices']: dask_cudf.Series
Contains the indices from the sampling result for path
reconstruction
"""
# Initialize dask client
client = default_client()
# Important for handling renumbering
input_graph.compute_renumber_edge_list(transposed=False)

start_list, info_list = start_info_list

if isinstance(start_list, list):
start_list = cudf.Series(start_list)
if isinstance(info_list, list):
info_list = cudf.Series(info_list)
# fanout_vals must be a host array!
# FIXME: ensure other sequence types (eg. cudf Series) can be handled.
if isinstance(fanout_vals, list):
fanout_vals = numpy.asarray(fanout_vals, dtype="int32")
else:
raise TypeError("fanout_vals must be a list, "
f"got: {type(fanout_vals)}")

ddf = input_graph.edgelist.edgelist_df
num_edges = len(ddf)
data = get_distributed_data(ddf)

src_col_name = input_graph.renumber_map.renumbered_src_col_name
dst_col_name = input_graph.renumber_map.renumbered_dst_col_name

# start_list uses "external" vertex IDs, but since the graph has been
# renumbered, the start vertex IDs must also be renumbered.
start_list = input_graph.lookup_internal_vertex_id(start_list).compute()
do_expensive_check = True

result = [client.submit(call_nbr_sampling,
Comms.get_session_id(),
wf[1],
src_col_name,
dst_col_name,
num_edges,
do_expensive_check,
start_list,
info_list,
fanout_vals,
with_replacement,
workers=[wf[0]])
for idx, wf in enumerate(data.worker_to_parts.items())]

wait(result)

cudf_result = [client.submit(convert_to_cudf,
cp_arrays)
for cp_arrays in result]

wait(cudf_result)

ddf = dask_cudf.from_delayed(cudf_result)
if input_graph.renumbered:
ddf = input_graph.unrenumber(ddf, "sources")
ddf = input_graph.unrenumber(ddf, "destinations")

return ddf
19 changes: 19 additions & 0 deletions python/cugraph/cugraph/experimental/dask/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
# Copyright (c) 2022, NVIDIA CORPORATION.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from cugraph.utilities.api_tools import experimental_warning_wrapper

from cugraph.dask.sampling.neighborhood_sampling import \
EXPERIMENTAL__uniform_neighborhood
uniform_neighborhood_sampling = \
experimental_warning_wrapper(EXPERIMENTAL__uniform_neighborhood)
133 changes: 133 additions & 0 deletions python/cugraph/cugraph/tests/dask/test_mg_neighborhood_sampling.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
# Copyright (c) 2022, NVIDIA CORPORATION.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import gc
import pytest
import cugraph.dask as dcg
import cugraph
import dask_cudf
import cudf
from cugraph.dask.common.mg_utils import is_single_gpu
from cugraph.tests import utils


# =============================================================================
# Test helpers
# =============================================================================
def setup_function():
gc.collect()


# datasets = utils.RAPIDS_DATASET_ROOT_DIR_PATH/"karate.csv"
datasets = utils.DATASETS_SMALL
fixture_params = utils.genFixtureParamsProduct((datasets, "graph_file"))


def _get_param_args(param_name, param_values):
"""
Returns a tuple of (<param_name>, <pytest.param list>) which can be applied
as the args to pytest.mark.parametrize(). The pytest.param list also
contains param id string formed from the param name and values.
"""
return (param_name,
[pytest.param(v, id=f"{param_name}={v}") for v in param_values])


@pytest.mark.skipif(
is_single_gpu(), reason="skipping MG testing on Single GPU system"
)
def test_mg_neighborhood_sampling_simple(dask_client):

from cugraph.experimental.dask import uniform_neighborhood_sampling

df = cudf.DataFrame({"src": cudf.Series([0, 1, 1, 2, 2, 2, 3, 4],
dtype="int32"),
"dst": cudf.Series([1, 3, 4, 0, 1, 3, 5, 5],
dtype="int32"),
"value": cudf.Series([0.1, 2.1, 1.1, 5.1, 3.1,
4.1, 7.2, 3.2],
dtype="float32"),
})
ddf = dask_cudf.from_cudf(df, npartitions=2)

G = cugraph.Graph(directed=True)
G.from_dask_cudf_edgelist(ddf, "src", "dst", "value")

# TODO: Incomplete, include more testing for tree graph as well as
# for larger graphs
start_list = cudf.Series([0, 1], dtype="int32")
info_list = cudf.Series([0, 0], dtype="int32")
fanout_vals = [1, 1]
with_replacement = True
result_nbr = uniform_neighborhood_sampling(G,
(start_list, info_list),
fanout_vals,
with_replacement)
result_nbr = result_nbr.compute()

# Since the validity of results have (probably) been tested at botht he C++
# and C layers, simply test that the python interface and conversions were
# done correctly.
assert result_nbr['sources'].dtype == "int32"
assert result_nbr['destinations'].dtype == "int32"
assert result_nbr['labels'].dtype == "int32"
assert result_nbr['indices'].dtype == "int32"

# ALl labels should be 0 or 1
assert result_nbr['labels'].isin([0, 1]).all()


@pytest.mark.skipif(
is_single_gpu(), reason="skipping MG testing on Single GPU system"
)
def test_mg_neighborhood_sampling_tree(dask_client):

from cugraph.experimental.dask import uniform_neighborhood_sampling

input_data_path = (utils.RAPIDS_DATASET_ROOT_DIR_PATH /
"small_tree.csv").as_posix()
chunksize = dcg.get_chunksize(input_data_path)

ddf = dask_cudf.read_csv(
input_data_path,
chunksize=chunksize,
delimiter=" ",
names=["src", "dst", "value"],
dtype=["int32", "int32", "float32"],
)

G = cugraph.Graph(directed=True)
G.from_dask_cudf_edgelist(ddf, "src", "dst", "value")

# TODO: Incomplete, include more testing for tree graph as well as
# for larger graphs
start_list = cudf.Series([0, 0], dtype="int32")
info_list = cudf.Series([0, 0], dtype="int32")
fanout_vals = [4, 1, 3]
with_replacement = True
result_nbr = uniform_neighborhood_sampling(G,
(start_list, info_list),
fanout_vals,
with_replacement)
result_nbr = result_nbr.compute()

# Since the validity of results have (probably) been tested at botht he C++
# and C layers, simply test that the python interface and conversions were
# done correctly.
assert result_nbr['sources'].dtype == "int32"
assert result_nbr['destinations'].dtype == "int32"
assert result_nbr['labels'].dtype == "int32"
assert result_nbr['indices'].dtype == "int32"

# All labels should be 0
assert (result_nbr['labels'] == 0).all()
Loading

0 comments on commit 38be932

Please sign in to comment.