Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEA]: Add DASK edgelist and graph support to the Dataset API #4035

Merged
merged 11 commits into from
Jan 9, 2024
124 changes: 115 additions & 9 deletions python/cugraph/cugraph/datasets/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,13 @@
# limitations under the License.

import cudf
import dask_cudf
import yaml
import os
import pandas as pd
import cugraph.dask as dcg
from pathlib import Path
import urllib.request
from cugraph.structure.graph_classes import Graph


Expand Down Expand Up @@ -138,9 +141,8 @@ def __download_csv(self, url):

filename = self.metadata["name"] + self.metadata["file_type"]
if self._dl_path.path.is_dir():
df = cudf.read_csv(url)
self._path = self._dl_path.path / filename
df.to_csv(self._path, index=False)
urllib.request.urlretrieve(url, str(self._path))

else:
raise RuntimeError(
Expand All @@ -149,7 +151,6 @@ def __download_csv(self, url):
return self._path

def unload(self):

"""
Remove all saved internal objects, forcing them to be re-created when
accessed.
Expand All @@ -162,7 +163,7 @@ def unload(self):

def get_edgelist(self, download=False, reader="cudf"):
"""
Return an Edgelist
Return an Edgelist.

Parameters
----------
Expand Down Expand Up @@ -212,6 +213,47 @@ def get_edgelist(self, download=False, reader="cudf"):

return self._edgelist.copy()

def get_dask_edgelist(self, download=False):
"""
Return a distributed Edgelist.

Parameters
----------
download : Boolean (default=False)
Automatically download the dataset from the 'url' location within
the YAML file.
"""
if self._edgelist is None:
full_path = self.get_path()
if not full_path.is_file():
if download:
full_path = self.__download_csv(self.metadata["url"])
else:
raise RuntimeError(
f"The datafile {full_path} does not"
" exist. Try setting download=True"
" to download the datafile"
)

header = None
if isinstance(self.metadata["header"], int):
header = self.metadata["header"]

blocksize = dcg.get_chunksize(full_path)
self._edgelist = dask_cudf.read_csv(
path=full_path,
blocksize=blocksize,
delimiter=self.metadata["delim"],
names=self.metadata["col_names"],
dtype={
self.metadata["col_names"][i]: self.metadata["col_types"][i]
for i in range(len(self.metadata["col_types"]))
},
header=header,
)

return self._edgelist.copy()

def get_graph(
self,
download=False,
Expand Down Expand Up @@ -249,10 +291,10 @@ def get_graph(
if create_using is None:
G = Graph()
elif isinstance(create_using, Graph):
# what about BFS if trnaposed is True
# what about BFS if transposed is True
attrs = {"directed": create_using.is_directed()}
G = type(create_using)(**attrs)
elif type(create_using) is type:
elif issubclass(create_using, Graph):
G = create_using()
else:
raise TypeError(
Expand All @@ -277,9 +319,74 @@ def get_graph(
)
return G

def get_dask_graph(
self,
download=False,
create_using=Graph,
ignore_weights=False,
store_transposed=False,
):
"""
Return a distributed Graph object.

Parameters
----------
download : Boolean (default=False)
Downloads the dataset from the web.

create_using: cugraph.Graph (instance or class), optional
(default=Graph)
Specify the type of Graph to create. Can pass in an instance to
create a Graph instance with specified 'directed' attribute.

ignore_weights : Boolean (default=False)
Ignores weights in the dataset if True, resulting in an
unweighted Graph. If False (the default), weights from the
dataset -if present- will be applied to the Graph. If the
dataset does not contain weights, the Graph returned will
be unweighted regardless of ignore_weights.

store_transposed : bool, optional (default=False)
If True, stores the transpose of the adjacency matrix. Required
for certain algorithms.
"""
if self._edgelist is None:
self.get_dask_edgelist(download)

if create_using is None:
G = Graph()
elif isinstance(create_using, Graph):
attrs = {"directed": create_using.is_directed()}
G = type(create_using)(**attrs)
elif issubclass(create_using, Graph):
G = create_using()
else:
raise TypeError(
"create_using must be a cugraph.Graph "
"(or subclass) type or instance, got: "
f"{type(create_using)}"
)

if len(self.metadata["col_names"]) > 2 and not (ignore_weights):
G.from_dask_cudf_edgelist(
self._edgelist,
source=self.metadata["col_names"][0],
destination=self.metadata["col_names"][1],
edge_attr=self.metadata["col_names"][2],
store_transposed=store_transposed,
)
else:
G.from_dask_cudf_edgelist(
self._edgelist,
source=self.metadata["col_names"][0],
destination=self.metadata["col_names"][1],
store_transposed=store_transposed,
)
return G

def get_path(self):
"""
Returns the location of the stored dataset file
Returns the location of the stored dataset file.
"""
if self._path is None:
self._path = self._dl_path.path / (
Expand Down Expand Up @@ -347,8 +454,7 @@ def download_all(force=False):
filename = meta["name"] + meta["file_type"]
save_to = default_download_dir.path / filename
if not save_to.is_file() or force:
df = cudf.read_csv(meta["url"])
df.to_csv(save_to, index=False)
urllib.request.urlretrieve(meta["url"], str(save_to))


def set_download_dir(path):
Expand Down
75 changes: 74 additions & 1 deletion python/cugraph/cugraph/tests/utils/test_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import pytest

import cudf
import dask_cudf
from cugraph.structure import Graph
from cugraph.testing import (
RAPIDS_DATASET_ROOT_DIR_PATH,
Expand All @@ -29,6 +30,7 @@
BENCHMARKING_DATASETS,
)
from cugraph import datasets
from cugraph.dask.common.mg_utils import is_single_gpu

# Add the sg marker to all tests in this module.
pytestmark = pytest.mark.sg
Expand All @@ -37,6 +39,7 @@
###############################################################################
# Fixtures


# module fixture - called once for this module
@pytest.fixture(scope="module")
def tmpdir():
Expand Down Expand Up @@ -77,6 +80,7 @@ def setup(tmpdir):
###############################################################################
# Helpers


# check if there is a row where src == dst
def has_selfloop(dataset):
if not dataset.metadata["is_directed"]:
Expand Down Expand Up @@ -115,6 +119,7 @@ def is_symmetric(dataset):
###############################################################################
# Tests


# setting download_dir to None effectively re-initialized the default
def test_env_var():
os.environ["RAPIDS_DATASET_ROOT_DIR"] = "custom_storage_location"
Expand Down Expand Up @@ -150,9 +155,19 @@ def test_download(dataset):
assert dataset.get_path().is_file()


@pytest.mark.skipif(is_single_gpu(), reason="skipping MG testing on Single GPU system")
@pytest.mark.skip(reason="MG not supported on CI")
@pytest.mark.parametrize("dataset", ALL_DATASETS)
def test_download_dask(dask_client, dataset):
E = dataset.get_dask_edgelist(download=True)

assert E is not None
assert dataset.get_path().is_file()


@pytest.mark.parametrize("dataset", SMALL_DATASETS)
def test_reader(dataset):
# defaults to using cudf.read_csv
# defaults to using cudf
E = dataset.get_edgelist(download=True)

assert E is not None
Expand All @@ -171,18 +186,46 @@ def test_reader(dataset):
dataset.get_edgelist(reader=None)


@pytest.mark.skipif(is_single_gpu(), reason="skipping MG testing on Single GPU system")
@pytest.mark.skip(reason="MG not supported on CI")
@pytest.mark.parametrize("dataset", SMALL_DATASETS)
def test_reader_dask(dask_client, dataset):
# using dask_cudf
E = dataset.get_dask_edgelist(download=True)

assert E is not None
assert isinstance(E, dask_cudf.core.DataFrame)
dataset.unload()


@pytest.mark.parametrize("dataset", ALL_DATASETS)
def test_get_edgelist(dataset):
E = dataset.get_edgelist(download=True)
assert E is not None


@pytest.mark.skipif(is_single_gpu(), reason="skipping MG testing on Single GPU system")
@pytest.mark.skip(reason="MG not supported on CI")
@pytest.mark.parametrize("dataset", ALL_DATASETS)
def test_get_dask_edgelist(dask_client, dataset):
E = dataset.get_dask_edgelist(download=True)
assert E is not None


@pytest.mark.parametrize("dataset", ALL_DATASETS)
def test_get_graph(dataset):
G = dataset.get_graph(download=True)
assert G is not None


@pytest.mark.skipif(is_single_gpu(), reason="skipping MG testing on Single GPU system")
@pytest.mark.skip(reason="MG not supported on CI")
@pytest.mark.parametrize("dataset", ALL_DATASETS)
def test_get_dask_graph(dask_client, dataset):
G = dataset.get_dask_graph(download=True)
assert G is not None


@pytest.mark.parametrize("dataset", ALL_DATASETS)
def test_metadata(dataset):
M = dataset.metadata
Expand All @@ -207,6 +250,16 @@ def test_weights(dataset):
assert not G.is_weighted()


@pytest.mark.skipif(is_single_gpu(), reason="skipping MG testing on Single GPU system")
@pytest.mark.skip(reason="MG not supported on CI")
@pytest.mark.parametrize("dataset", WEIGHTED_DATASETS)
def test_weights_dask(dask_client, dataset):
G = dataset.get_dask_graph(download=True)
assert G.is_weighted()
G = dataset.get_dask_graph(download=True, ignore_weights=True)
assert not G.is_weighted()


@pytest.mark.parametrize("dataset", SMALL_DATASETS)
def test_create_using(dataset):
G = dataset.get_graph(download=True)
Expand All @@ -216,6 +269,26 @@ def test_create_using(dataset):
G = dataset.get_graph(download=True, create_using=Graph(directed=True))
assert G.is_directed()

# using a non-Graph type should raise an error
with pytest.raises(TypeError):
dataset.get_graph(download=True, create_using=set)


@pytest.mark.skipif(is_single_gpu(), reason="skipping MG testing on Single GPU system")
@pytest.mark.skip(reason="MG not supported on CI")
@pytest.mark.parametrize("dataset", SMALL_DATASETS)
def test_create_using_dask(dask_client, dataset):
G = dataset.get_dask_graph(download=True)
assert not G.is_directed()
G = dataset.get_dask_graph(download=True, create_using=Graph)
assert not G.is_directed()
G = dataset.get_dask_graph(download=True, create_using=Graph(directed=True))
assert G.is_directed()

# using a non-Graph type should raise an error
with pytest.raises(TypeError):
dataset.get_dask_graph(download=True, create_using=set)


def test_ctor_with_datafile():
from cugraph.datasets import karate
Expand Down
Loading