Skip to content

Commit

Permalink
[BUG] Fix Incorrect File Selection in cuGraph-PyG Loader (#3599)
Browse files Browse the repository at this point in the history
The current version of the cuGraph-PyG loader does not properly handle the case where samples were previously computed and saved to disk.  It also compares the batch ids to determine when it is done processing, which is also incorrect, since batches can be loaded in any order.

This PR updates the cuGraph-PyG loader to use only the list of input files to determine when it is done.  It also allows specifying the input file list to support cases where previously-computed batches are loaded from disk.  The loader tests are updated to verify the new behavior and ensure that loading previously-computed batches works as expected.

Authors:
  - Alex Barghi (https://github.com/alexbarghi-nv)

Approvers:
  - Vibhu Jawa (https://github.com/VibhuJawa)
  - Brad Rees (https://github.com/BradReesWork)

URL: #3599
  • Loading branch information
alexbarghi-nv authored May 24, 2023
1 parent 28dc2eb commit ab39563
Show file tree
Hide file tree
Showing 2 changed files with 121 additions and 17 deletions.
45 changes: 28 additions & 17 deletions python/cugraph-pyg/cugraph_pyg/loader/cugraph_node_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,14 @@ def __init__(
self,
feature_store: CuGraphStore,
graph_store: CuGraphStore,
input_nodes: Union[InputNodes, int] = None,
input_nodes: InputNodes = None,
batch_size: int = 0,
shuffle=False,
shuffle: bool = False,
edge_types: Sequence[Tuple[str]] = None,
directory=None,
starting_batch_id=0,
batches_per_partition=100,
directory: Union[str, tempfile.TemporaryDirectory] = None,
input_files: List[str] = None,
starting_batch_id: int = 0,
batches_per_partition: int = 100,
# Sampler args
num_neighbors: Union[List[int], Dict[Tuple[str, str, str], List[int]]] = None,
replace: bool = True,
Expand All @@ -69,9 +70,9 @@ def __init__(
graph_store: CuGraphStore
The graph store containing the graph structure.
input_nodes: Union[InputNodes, int]
input_nodes: InputNodes
The input nodes associated with this sampler.
If this is an integer N, this loader will load N batches
If None, this loader will load batches
from disk rather than performing sampling in memory.
batch_size: int
Expand All @@ -92,10 +93,13 @@ def __init__(
The path of the directory to write samples to.
Defaults to a new generated temporary directory.
input_files: List[str] (optional, default=None)
The input files to read from the directory containing
samples. This argument is only used when loading
alread-sampled batches from disk.
starting_batch_id: int (optional, default=0)
The starting id for each batch. Defaults to 0.
Generally used when loading previously-sampled
batches from disk.
batches_per_partition: int (optional, default=100)
The number of batches in each output partition.
Expand All @@ -121,11 +125,17 @@ def __init__(
self.__batches_per_partition = batches_per_partition
self.__starting_batch_id = starting_batch_id

if isinstance(input_nodes, int):
if input_nodes is None:
# Will be loading from disk
self.__num_batches = input_nodes
self.__directory = directory
iter(os.listdir(self.__directory))
if input_files is None:
if isinstance(self.__directory, str):
self.__input_files = iter(os.listdir(self.__directory))
else:
self.__input_files = iter(os.listdir(self.__directory.name))
else:
self.__input_files = iter(input_files)
return

input_type, input_nodes = torch_geometric.loader.utils.get_input_nodes(
Expand Down Expand Up @@ -201,7 +211,13 @@ def __next__(self):
)

# Will raise StopIteration if there are no files left
fname = next(self.__input_files)
try:
fname = next(self.__input_files)
except StopIteration as ex:
# Won't delete a non-temp dir (since it would just be deleting a string)
del self.__directory
self.__directory = None
raise StopIteration(ex)

m = self.__ex_parquet_file.match(fname)
if m is None:
Expand Down Expand Up @@ -234,12 +250,7 @@ def __next__(self):
)

# Get ready for next iteration
# If there is no next iteration, make sure results are deleted
self.__next_batch += 1
if self.__next_batch >= self.__num_batches + self.__starting_batch_id:
# Won't delete a non-temp dir (since it would just be deleting a string)
del self.__directory
self.__directory = None

# Get and return the sampled subgraph
if isinstance(torch_geometric, MissingModule):
Expand Down
93 changes: 93 additions & 0 deletions python/cugraph-pyg/cugraph_pyg/tests/test_cugraph_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,16 @@

import pytest

import tempfile
import os

import cudf
import cupy

from cugraph_pyg.loader import CuGraphNeighborLoader
from cugraph_pyg.loader import BulkSampleLoader
from cugraph_pyg.data import CuGraphStore
from cugraph.gnn import FeatureStore
from cugraph.utilities.utils import import_optional, MissingModule

torch = import_optional("torch")
Expand Down Expand Up @@ -70,3 +78,88 @@ def test_cugraph_loader_hetero(karate_gnn):
if "type1" in sample:
for prop in sample["type1"]["prop0"].tolist():
assert prop % 41 == 0


@pytest.mark.skipif(isinstance(torch, MissingModule), reason="torch not available")
def test_cugraph_loader_from_disk():
F = FeatureStore()
F.add_data(torch.tensor([1, 2, 3, 4, 5, 6, 7]), "t0", "x")

G = {("t0", "knows", "t0"): 7}
N = {"t0": 7}

cugraph_store = CuGraphStore(F, G, N)

bogus_samples = cudf.DataFrame(
{
"sources": [0, 1, 2, 3, 4, 5, 6],
"destinations": [6, 4, 3, 2, 2, 1, 5],
"edge_type": cudf.Series([0, 0, 0, 0, 0, 0, 0], dtype="int32"),
"edge_id": [5, 10, 15, 20, 25, 30, 35],
"hop_id": cudf.Series([0, 0, 0, 1, 1, 2, 2], dtype="int32"),
}
)

tempdir = tempfile.TemporaryDirectory()
for s in range(256):
bogus_samples["batch_id"] = cupy.int32(s)
bogus_samples.to_parquet(os.path.join(tempdir.name, f"batch={s}-{s}.parquet"))

loader = BulkSampleLoader(
feature_store=cugraph_store,
graph_store=cugraph_store,
directory=tempdir,
)

num_samples = 0
for sample in loader:
num_samples += 1
assert sample["t0"]["num_nodes"] == 7
# correct vertex order is [0, 1, 2, 6, 4, 3, 5]; x = [1, 2, 3, 7, 5, 4, 6]
assert sample["t0"]["x"].tolist() == [1, 2, 3, 7, 5, 4, 6]
assert list(sample[("t0", "knows", "t0")]["edge_index"].shape) == [2, 7]

assert num_samples == 256


@pytest.mark.skipif(isinstance(torch, MissingModule), reason="torch not available")
def test_cugraph_loader_from_disk_subset():
F = FeatureStore()
F.add_data(torch.tensor([1, 2, 3, 4, 5, 6, 7]), "t0", "x")

G = {("t0", "knows", "t0"): 7}
N = {"t0": 7}

cugraph_store = CuGraphStore(F, G, N)

bogus_samples = cudf.DataFrame(
{
"sources": [0, 1, 2, 3, 4, 5, 6],
"destinations": [6, 4, 3, 2, 2, 1, 5],
"edge_type": cudf.Series([0, 0, 0, 0, 0, 0, 0], dtype="int32"),
"edge_id": [5, 10, 15, 20, 25, 30, 35],
"hop_id": cudf.Series([0, 0, 0, 1, 1, 2, 2], dtype="int32"),
}
)

tempdir = tempfile.TemporaryDirectory()
for s in range(256):
bogus_samples["batch_id"] = cupy.int32(s)
bogus_samples.to_parquet(os.path.join(tempdir.name, f"batch={s}-{s}.parquet"))

loader = BulkSampleLoader(
feature_store=cugraph_store,
graph_store=cugraph_store,
directory=tempdir,
input_files=list(os.listdir(tempdir.name))[100:200],
)

num_samples = 0
for sample in loader:
num_samples += 1
assert sample["t0"]["num_nodes"] == 7
# correct vertex order is [0, 1, 2, 6, 4, 3, 5]; x = [1, 2, 3, 7, 5, 4, 6]
assert sample["t0"]["x"].tolist() == [1, 2, 3, 7, 5, 4, 6]
assert list(sample[("t0", "knows", "t0")]["edge_index"].shape) == [2, 7]

assert num_samples == 100

0 comments on commit ab39563

Please sign in to comment.