Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[REVIEW] OPG pagerank (Py) #944

Merged
merged 11 commits into from
Jun 16, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
## New Features
- PR #937 Add wrapper for gunrock HITS algorithm
- PR #939 Updated Notebooks to include new features and benchmarks
- PR #944 opg pagerank dask

## Improvements
- PR #898 Add Edge Betweenness Centrality, and endpoints to BC
Expand Down
3 changes: 2 additions & 1 deletion cpp/include/algorithms.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,8 @@ namespace cugraph {
*
*/
template <typename VT, typename ET, typename WT>
void pagerank(experimental::GraphCSCView<VT, ET, WT> const &graph,
void pagerank(raft::handle_t const &handle,
experimental::GraphCSCView<VT, ET, WT> const &graph,
WT *pagerank,
VT personalization_subset_size = 0,
VT *personalization_subset = nullptr,
Expand Down
15 changes: 14 additions & 1 deletion cpp/include/graph.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,19 +60,32 @@ class GraphViewBase {
VT number_of_vertices;
ET number_of_edges;

VT *local_vertices;
ET *local_edges;
VT *local_offsets;

/**
* @brief Fill the identifiers array with the vertex identifiers.
*
* @param[out] identifier Pointer to device memory to store the vertex
* identifiers
*/
void get_vertex_identifiers(VT *identifiers) const;
void set_local_data(VT *local_vertices_, ET *local_edges_, VT *local_offsets_)
{
local_vertices = local_vertices_;
local_edges = local_edges_;
local_offsets = local_offsets_;
}
void set_handle(raft::handle_t *handle_) { handle = handle_; }
GraphViewBase(WT *edge_data_, VT number_of_vertices_, ET number_of_edges_)
: edge_data(edge_data_),
prop(),
number_of_vertices(number_of_vertices_),
number_of_edges(number_of_edges_)
number_of_edges(number_of_edges_),
local_vertices(nullptr),
local_edges(nullptr),
local_offsets(nullptr)
{
handle = new raft::handle_t;
}
Expand Down
16 changes: 13 additions & 3 deletions cpp/src/link_analysis/pagerank.cu
Original file line number Diff line number Diff line change
Expand Up @@ -338,7 +338,8 @@ void pagerank_impl(experimental::GraphCSCView<VT, ET, WT> const &graph,
} // namespace detail

template <typename VT, typename ET, typename WT>
void pagerank(experimental::GraphCSCView<VT, ET, WT> const &graph,
void pagerank(raft::handle_t const &handle,
experimental::GraphCSCView<VT, ET, WT> const &graph,
WT *pagerank,
VT personalization_subset_size,
VT *personalization_subset,
Expand All @@ -350,6 +351,13 @@ void pagerank(experimental::GraphCSCView<VT, ET, WT> const &graph,
{
CUGRAPH_EXPECTS(pagerank != nullptr, "Invalid API parameter: Pagerank array should be of size V");

if (handle.comms_initialized()) {
afender marked this conversation as resolved.
Show resolved Hide resolved
std::cout << "\nINSIDE CPP\n";
auto &comm = handle.get_comms();
std::cout << comm.get_rank() << "\n";
std::cout << comm.get_size() << "\n";
return;
}
return detail::pagerank_impl<VT, ET, WT>(graph,
pagerank,
personalization_subset_size,
Expand All @@ -362,7 +370,8 @@ void pagerank(experimental::GraphCSCView<VT, ET, WT> const &graph,
}

// explicit instantiation
template void pagerank<int, int, float>(experimental::GraphCSCView<int, int, float> const &graph,
template void pagerank<int, int, float>(raft::handle_t const &handle,
experimental::GraphCSCView<int, int, float> const &graph,
float *pagerank,
int personalization_subset_size,
int *personalization_subset,
Expand All @@ -371,7 +380,8 @@ template void pagerank<int, int, float>(experimental::GraphCSCView<int, int, flo
double tolerance,
int64_t max_iter,
bool has_guess);
template void pagerank<int, int, double>(experimental::GraphCSCView<int, int, double> const &graph,
template void pagerank<int, int, double>(raft::handle_t const &handle,
experimental::GraphCSCView<int, int, double> const &graph,
double *pagerank,
int personalization_subset_size,
int *personalization_subset,
Expand Down
6 changes: 3 additions & 3 deletions cpp/tests/pagerank/pagerank_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -142,14 +142,14 @@ class Tests_Pagerank : public ::testing::TestWithParam<Pagerank_Usecase> {
if (PERF) {
hr_clock.start();
for (int i = 0; i < PERF_MULTIPLIER; ++i) {
cugraph::pagerank<int, int, T>(G, d_pagerank);
cugraph::pagerank<int, int, T>(G.handle[0], G, d_pagerank);
cudaDeviceSynchronize();
}
hr_clock.stop(&time_tmp);
pagerank_time.push_back(time_tmp);
} else {
cudaProfilerStart();
cugraph::pagerank<int, int, T>(G, d_pagerank);
cugraph::pagerank<int, int, T>(G.handle[0], G, d_pagerank);
cudaProfilerStop();
cudaDeviceSynchronize();
}
Expand Down Expand Up @@ -207,4 +207,4 @@ int main(int argc, char** argv)
rmm::mr::set_default_resource(resource.get());
int rc = RUN_ALL_TESTS();
return rc;
}
}
2 changes: 2 additions & 0 deletions python/cugraph/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@
from cugraph.layout import force_atlas2
from cugraph.raft import raft_include_test

from cugraph.opg.link_analysis.mg_pagerank_wrapper import mg_pagerank

# Versioneer
from ._version import get_versions
__version__ = get_versions()['version']
Expand Down
15 changes: 15 additions & 0 deletions python/cugraph/dask/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
# Copyright (c) 2020, NVIDIA CORPORATION.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from .opg_pagerank.pagerank import pagerank
from .common.read_utils import get_chunksize
41 changes: 39 additions & 2 deletions python/cugraph/dask/common/input_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,7 @@ def create(cls, data, client=None):
raise Exception("Graph data must be dask-cudf dataframe")

gpu_futures = client.sync(_extract_partitions, data, client)
workers = tuple(set(map(lambda x: x[0], gpu_futures)))

workers = tuple(OrderedDict.fromkeys(map(lambda x: x[0], gpu_futures)))
return DistributedDataHandler(gpu_futures=gpu_futures, workers=workers,
datatype=datatype, multiple=multiple,
client=client)
Expand Down Expand Up @@ -133,6 +132,36 @@ def calculate_parts_to_sizes(self, comms=None, ranks=None):

self.total_rows += total

def calculate_local_data(self, comms):

if self.worker_info is None and comms is not None:
self.calculate_worker_and_rank_info(comms)

local_data = dict([(self.worker_info[wf[0]]["rank"],
self.client.submit(
get_local_data,
wf[1],
workers=[wf[0]]))
for idx, wf in enumerate(self.worker_to_parts.items()
)])

_local_data_dict = self.client.compute(local_data, sync=True)
local_data_dict = {'edges': [], 'offsets': [], 'verts': []}
for rank in range(len(_local_data_dict)):
data = _local_data_dict[rank]
local_data_dict['edges'].append(data[0])
local_data_dict['offsets'].append(data[1])
local_data_dict['verts'].append(data[2])

import numpy as np
local_data_dict['edges'] = np.array(local_data_dict['edges'],
dtype=np.int32)
local_data_dict['offsets'] = np.array(local_data_dict['offsets'],
dtype=np.int32)
local_data_dict['verts'] = np.array(local_data_dict['verts'],
dtype=np.int32)
return local_data_dict


""" Internal methods, API subject to change """

Expand All @@ -156,3 +185,11 @@ def _get_rows(objs, multiple):
def get_obj(x): return x[0] if multiple else x
total = list(map(lambda x: get_obj(x).shape[0], objs))
return total, reduce(lambda a, b: a + b, total)


def get_local_data(df):
df = df[0]
num_local_edges = len(df)
local_offset = df['dst'].min()
num_local_verts = df['dst'].max() - local_offset + 1
return num_local_edges, local_offset, num_local_verts
45 changes: 45 additions & 0 deletions python/cugraph/dask/common/read_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
# Copyright (c) 2019-2020, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


def get_n_gpus():
import os
try:
return len(os.environ["CUDA_VISIBLE_DEVICES"].split(","))
except KeyError:
return len(os.popen("nvidia-smi -L").read().strip().split("\n"))


def get_chunksize(input_path):
"""
Calculate the appropriate chunksize for dask_cudf.read_csv
to get a number of partitions equal to the number of GPUs
Examples
--------
>>> import dask_cugraph.pagerank as dcg
>>> chunksize = dcg.get_chunksize(edge_list.csv)
"""

import os
from glob import glob
import math

input_files = sorted(glob(str(input_path)))
if len(input_files) == 1:
size = os.path.getsize(input_files[0])
chunksize = math.ceil(size/get_n_gpus())
else:
size = [os.path.getsize(_file) for _file in input_files]
chunksize = max(size)
return chunksize
Empty file.
49 changes: 49 additions & 0 deletions python/cugraph/dask/opg_pagerank/pagerank.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
# Copyright (c) 2019-2020, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

from cugraph.raft.dask.common.comms import Comms
from cugraph.dask.common.input_utils import DistributedDataHandler
from dask.distributed import wait, default_client
from cugraph.raft.dask.common.comms import worker_state
from cugraph.opg.link_analysis import mg_pagerank_wrapper as mg_pagerank


def common_func(sID, data, local_data):
print("Dataframe: ", data)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are all of these debugging artifacts? Is it required to print all of this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This going to be updated/removed in the final PR. Currently these prints exist for checking input validity as we can't verify results pagerank without Opg cuda code.

print("local data: ", local_data)
sessionstate = worker_state(sID)
mg_pagerank.mg_pagerank(data[0], local_data, sessionstate['handle'])
return 1


def pagerank(input_graph):
print("INSIDE DASK PAGERANK")
client = default_client()
_ddf = input_graph.edgelist.edgelist_df
ddf = _ddf.sort_values(by='dst', ignore_index=True)
data = DistributedDataHandler.create(data=ddf)
comms = Comms(comms_p2p=False)
comms.init(data.workers)
local_data = data.calculate_local_data(comms)
print("Calling function")
result = dict([(data.worker_info[wf[0]]["rank"],
client.submit(
common_func,
comms.sessionId,
wf[1],
local_data,
workers=[wf[0]]))
for idx, wf in enumerate(data.worker_to_parts.items())])
wait(result)
1 change: 0 additions & 1 deletion python/cugraph/dask/pagerank/pagerank.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,6 @@ def pagerank(edge_list, alpha=0.85, max_iter=30):
>>> dtype=['int32', 'int32'])
>>> pr = dcg.pagerank(ddf_edge_list, alpha=0.85, max_iter=50)
"""

Copy link
Member

@afender afender Jun 16, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there some dead code from SNMG and/or things that should be in RAFT in this file?

Copy link
Contributor Author

@Iroy30 Iroy30 Jun 16, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is entirely snmg code. It simply connects to cugraph/python/cugraph/snmg/link_analysis/mg_pagerank.py which throws an exception saying it is disabled. We could update it to throw exception at python/cugraph/dask/pagerank/pagerank.py itself or just remove it completely for 0.15 release once we have OPG pagerank.

client = default_client()
gpu_futures = _get_mg_info(edge_list)
# npartitions = len(gpu_futures)
Expand Down
1 change: 1 addition & 0 deletions python/cugraph/link_analysis/pagerank.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ from libcpp cimport bool
cdef extern from "algorithms.hpp" namespace "cugraph":

cdef void pagerank[VT,ET,WT](
const handle_t &handle,
const GraphCSCView[VT,ET,WT] &graph,
WT *pagerank,
VT size,
Expand Down
4 changes: 2 additions & 2 deletions python/cugraph/link_analysis/pagerank_wrapper.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -96,12 +96,12 @@ def pagerank(input_graph, alpha=0.85, personalization=None, max_iter=100, tol=1.
if (df['pagerank'].dtype == np.float32):
graph_float = GraphCSCView[int,int,float](<int*>c_offsets, <int*>c_indices, <float*>c_weights, num_verts, num_edges)

c_pagerank[int,int,float](graph_float, <float*> c_pagerank_val, sz, <int*> c_pers_vtx, <float*> c_pers_val,
c_pagerank[int,int,float](graph_float.handle[0], graph_float, <float*> c_pagerank_val, sz, <int*> c_pers_vtx, <float*> c_pers_val,
<float> alpha, <float> tol, <int> max_iter, has_guess)
graph_float.get_vertex_identifiers(<int*>c_identifier)
else:
graph_double = GraphCSCView[int,int,double](<int*>c_offsets, <int*>c_indices, <double*>c_weights, num_verts, num_edges)
c_pagerank[int,int,double](graph_double, <double*> c_pagerank_val, sz, <int*> c_pers_vtx, <double*> c_pers_val,
c_pagerank[int,int,double](graph_double.handle[0], graph_double, <double*> c_pagerank_val, sz, <int*> c_pers_vtx, <double*> c_pers_val,
<float> alpha, <float> tol, <int> max_iter, has_guess)
graph_double.get_vertex_identifiers(<int*>c_identifier)

Expand Down
Empty file.
17 changes: 17 additions & 0 deletions python/cugraph/opg/link_analysis/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
#
# Copyright (c) 2020, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

from cugraph.opg.link_analysis.mg_pagerank_wrapper import mg_pagerank
33 changes: 33 additions & 0 deletions python/cugraph/opg/link_analysis/mg_pagerank.pxd
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
#
# Copyright (c) 2020, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

from cugraph.structure.graph_new cimport *
from libcpp cimport bool


cdef extern from "algorithms.hpp" namespace "cugraph":

cdef void pagerank[VT,ET,WT](
const handle_t &handle,
const GraphCSCView[VT,ET,WT] &graph,
WT *pagerank,
VT size,
VT *personalization_subset,
WT *personalization_values,
double alpha,
double tolerance,
long long max_iter,
bool has_guess) except +
Loading