Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG] Fix Incorrect File Selection in cuGraph-PyG Loader #3599

Merged
merged 3 commits into from
May 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 28 additions & 17 deletions python/cugraph-pyg/cugraph_pyg/loader/cugraph_node_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,14 @@ def __init__(
self,
feature_store: CuGraphStore,
graph_store: CuGraphStore,
input_nodes: Union[InputNodes, int] = None,
input_nodes: InputNodes = None,
batch_size: int = 0,
shuffle=False,
shuffle: bool = False,
edge_types: Sequence[Tuple[str]] = None,
directory=None,
starting_batch_id=0,
batches_per_partition=100,
directory: Union[str, tempfile.TemporaryDirectory] = None,
input_files: List[str] = None,
starting_batch_id: int = 0,
batches_per_partition: int = 100,
# Sampler args
num_neighbors: Union[List[int], Dict[Tuple[str, str, str], List[int]]] = None,
replace: bool = True,
Expand All @@ -69,9 +70,9 @@ def __init__(
graph_store: CuGraphStore
The graph store containing the graph structure.

input_nodes: Union[InputNodes, int]
input_nodes: InputNodes
The input nodes associated with this sampler.
If this is an integer N, this loader will load N batches
If None, this loader will load batches
from disk rather than performing sampling in memory.

batch_size: int
Expand All @@ -92,10 +93,13 @@ def __init__(
The path of the directory to write samples to.
Defaults to a new generated temporary directory.

input_files: List[str] (optional, default=None)
The input files to read from the directory containing
samples. This argument is only used when loading
alread-sampled batches from disk.

starting_batch_id: int (optional, default=0)
The starting id for each batch. Defaults to 0.
Generally used when loading previously-sampled
batches from disk.

batches_per_partition: int (optional, default=100)
The number of batches in each output partition.
Expand All @@ -121,11 +125,17 @@ def __init__(
self.__batches_per_partition = batches_per_partition
self.__starting_batch_id = starting_batch_id

if isinstance(input_nodes, int):
if input_nodes is None:
# Will be loading from disk
self.__num_batches = input_nodes
self.__directory = directory
iter(os.listdir(self.__directory))
if input_files is None:
if isinstance(self.__directory, str):
self.__input_files = iter(os.listdir(self.__directory))
else:
self.__input_files = iter(os.listdir(self.__directory.name))
else:
self.__input_files = iter(input_files)
return

input_type, input_nodes = torch_geometric.loader.utils.get_input_nodes(
Expand Down Expand Up @@ -201,7 +211,13 @@ def __next__(self):
)

# Will raise StopIteration if there are no files left
fname = next(self.__input_files)
try:
fname = next(self.__input_files)
except StopIteration as ex:
# Won't delete a non-temp dir (since it would just be deleting a string)
del self.__directory
self.__directory = None
raise StopIteration(ex)

m = self.__ex_parquet_file.match(fname)
if m is None:
Expand Down Expand Up @@ -234,12 +250,7 @@ def __next__(self):
)

# Get ready for next iteration
# If there is no next iteration, make sure results are deleted
self.__next_batch += 1
if self.__next_batch >= self.__num_batches + self.__starting_batch_id:
# Won't delete a non-temp dir (since it would just be deleting a string)
del self.__directory
self.__directory = None

# Get and return the sampled subgraph
if isinstance(torch_geometric, MissingModule):
Expand Down
93 changes: 93 additions & 0 deletions python/cugraph-pyg/cugraph_pyg/tests/test_cugraph_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,16 @@

import pytest

import tempfile
import os

import cudf
import cupy

from cugraph_pyg.loader import CuGraphNeighborLoader
from cugraph_pyg.loader import BulkSampleLoader
from cugraph_pyg.data import CuGraphStore
from cugraph.gnn import FeatureStore
from cugraph.utilities.utils import import_optional, MissingModule

torch = import_optional("torch")
Expand Down Expand Up @@ -70,3 +78,88 @@ def test_cugraph_loader_hetero(karate_gnn):
if "type1" in sample:
for prop in sample["type1"]["prop0"].tolist():
assert prop % 41 == 0


@pytest.mark.skipif(isinstance(torch, MissingModule), reason="torch not available")
def test_cugraph_loader_from_disk():
F = FeatureStore()
F.add_data(torch.tensor([1, 2, 3, 4, 5, 6, 7]), "t0", "x")

G = {("t0", "knows", "t0"): 7}
N = {"t0": 7}

cugraph_store = CuGraphStore(F, G, N)

bogus_samples = cudf.DataFrame(
{
"sources": [0, 1, 2, 3, 4, 5, 6],
"destinations": [6, 4, 3, 2, 2, 1, 5],
"edge_type": cudf.Series([0, 0, 0, 0, 0, 0, 0], dtype="int32"),
"edge_id": [5, 10, 15, 20, 25, 30, 35],
"hop_id": cudf.Series([0, 0, 0, 1, 1, 2, 2], dtype="int32"),
}
)

tempdir = tempfile.TemporaryDirectory()
for s in range(256):
bogus_samples["batch_id"] = cupy.int32(s)
bogus_samples.to_parquet(os.path.join(tempdir.name, f"batch={s}-{s}.parquet"))

loader = BulkSampleLoader(
feature_store=cugraph_store,
graph_store=cugraph_store,
directory=tempdir,
)

num_samples = 0
for sample in loader:
num_samples += 1
assert sample["t0"]["num_nodes"] == 7
# correct vertex order is [0, 1, 2, 6, 4, 3, 5]; x = [1, 2, 3, 7, 5, 4, 6]
assert sample["t0"]["x"].tolist() == [1, 2, 3, 7, 5, 4, 6]
assert list(sample[("t0", "knows", "t0")]["edge_index"].shape) == [2, 7]

assert num_samples == 256


@pytest.mark.skipif(isinstance(torch, MissingModule), reason="torch not available")
def test_cugraph_loader_from_disk_subset():
F = FeatureStore()
F.add_data(torch.tensor([1, 2, 3, 4, 5, 6, 7]), "t0", "x")

G = {("t0", "knows", "t0"): 7}
N = {"t0": 7}

cugraph_store = CuGraphStore(F, G, N)

bogus_samples = cudf.DataFrame(
{
"sources": [0, 1, 2, 3, 4, 5, 6],
"destinations": [6, 4, 3, 2, 2, 1, 5],
"edge_type": cudf.Series([0, 0, 0, 0, 0, 0, 0], dtype="int32"),
"edge_id": [5, 10, 15, 20, 25, 30, 35],
"hop_id": cudf.Series([0, 0, 0, 1, 1, 2, 2], dtype="int32"),
}
)

tempdir = tempfile.TemporaryDirectory()
for s in range(256):
bogus_samples["batch_id"] = cupy.int32(s)
bogus_samples.to_parquet(os.path.join(tempdir.name, f"batch={s}-{s}.parquet"))

loader = BulkSampleLoader(
feature_store=cugraph_store,
graph_store=cugraph_store,
directory=tempdir,
input_files=list(os.listdir(tempdir.name))[100:200],
)

num_samples = 0
for sample in loader:
num_samples += 1
assert sample["t0"]["num_nodes"] == 7
# correct vertex order is [0, 1, 2, 6, 4, 3, 5]; x = [1, 2, 3, 7, 5, 4, 6]
assert sample["t0"]["x"].tolist() == [1, 2, 3, 7, 5, 4, 6]
assert list(sample[("t0", "knows", "t0")]["edge_index"].shape) == [2, 7]

assert num_samples == 100