Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG] Fix Graph Construction From Pandas in cuGraph-PyG #3985

Merged
Merged
Show file tree
Hide file tree
Changes from 36 commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
e2b4764
fix large pandas dataframe
alexbarghi-nv Nov 1, 2023
f9305e5
c
alexbarghi-nv Nov 6, 2023
f1ef95f
fix loader bugs
alexbarghi-nv Nov 6, 2023
d5da756
test updates
alexbarghi-nv Nov 6, 2023
77c4525
style
alexbarghi-nv Nov 6, 2023
1a6124f
update dependencies.yaml
alexbarghi-nv Nov 7, 2023
c6a9724
Merge branch 'fix-loader-bugs' into fix-large-pandas-dataframe
alexbarghi-nv Nov 7, 2023
e207f98
Merge branch 'branch-23.12' into fix-loader-bugs
alexbarghi-nv Nov 7, 2023
20e4819
Merge branch 'branch-23.12' into fix-large-pandas-dataframe
alexbarghi-nv Nov 7, 2023
cf93084
style
alexbarghi-nv Nov 7, 2023
d7eb7d7
Merge branch 'fix-large-pandas-dataframe' of https://github.com/alexb…
alexbarghi-nv Nov 7, 2023
f619931
update test
alexbarghi-nv Nov 7, 2023
0f4693c
attempt to fix package resolution issue
alexbarghi-nv Nov 8, 2023
65f50a3
Merge branch 'fix-loader-bugs' of https://github.com/alexbarghi-nv/cu…
alexbarghi-nv Nov 8, 2023
9b7901c
Merge branch 'branch-23.12' into fix-loader-bugs
alexbarghi-nv Nov 8, 2023
9713cee
Add ucx to cugraph recipe
Nov 9, 2023
bc14fd7
Add ucx to libcugraph recipe
Nov 9, 2023
2064c37
Update conda/recipes/libcugraph/meta.yaml
naimnv Nov 9, 2023
b23a9d5
Merge branch 'fix_build' of https://github.com/naimnv/cugraph-forked …
alexbarghi-nv Nov 10, 2023
9384bb6
Merge branch 'fix-loader-bugs' of https://github.com/alexbarghi-nv/cu…
alexbarghi-nv Nov 10, 2023
11ff5bf
fix typing
alexbarghi-nv Nov 10, 2023
7b9396c
replace manual exchange with flip
alexbarghi-nv Nov 10, 2023
ede8399
Merge branch 'fix-loader-bugs' of https://github.com/alexbarghi-nv/cu…
alexbarghi-nv Nov 10, 2023
d1b5f22
various changes to get ci working
alexbarghi-nv Nov 11, 2023
e941e10
generator
alexbarghi-nv Nov 11, 2023
3f7ec7f
fix pylibcugraphops version, simplify test_python.sh
alexbarghi-nv Nov 12, 2023
46c4460
remove unwanted files
alexbarghi-nv Nov 12, 2023
a080f5a
Merge branch 'fix-loader-bugs' into fix-large-pandas-dataframe
alexbarghi-nv Nov 12, 2023
d229de4
remove strict csr/csr check
alexbarghi-nv Nov 12, 2023
55516bd
remove strict format check
alexbarghi-nv Nov 12, 2023
d1334ac
test correction
alexbarghi-nv Nov 14, 2023
2706d87
style
alexbarghi-nv Nov 14, 2023
d8e5eea
resolve merge conflict
alexbarghi-nv Nov 16, 2023
b9a9af3
revert meta changes
alexbarghi-nv Nov 16, 2023
4c9b73b
resolve merge conflict
alexbarghi-nv Nov 16, 2023
dcbf1e4
Merge branch 'branch-23.12' into fix-large-pandas-dataframe
alexbarghi-nv Nov 17, 2023
4850c29
Merge branch 'branch-23.12' into fix-large-pandas-dataframe
alexbarghi-nv Nov 17, 2023
adc94c5
remove unncessary argument from function
alexbarghi-nv Nov 17, 2023
821bbf5
Merge branch 'fix-large-pandas-dataframe' of https://github.com/alexb…
alexbarghi-nv Nov 17, 2023
cf074d1
Merge branch 'branch-23.12' into fix-large-pandas-dataframe
alexbarghi-nv Nov 20, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 56 additions & 21 deletions python/cugraph-pyg/cugraph_pyg/data/cugraph_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,12 @@
import cugraph
import warnings

from cugraph.utilities.utils import import_optional, MissingModule
import dask.array as dar
import dask.dataframe as dd
import dask.distributed as distributed
import dask_cudf

dd = import_optional("dask.dataframe")
distributed = import_optional("dask.distributed")
dask_cudf = import_optional("dask_cudf")
from cugraph.utilities.utils import import_optional, MissingModule

torch = import_optional("torch")
torch_geometric = import_optional("torch_geometric")
Expand Down Expand Up @@ -367,6 +368,15 @@ def __infer_offsets(
}
)

def __dask_array_from_numpy(
self, array: np.ndarray, client: distributed.client.Client, npartitions: int
tingyu66 marked this conversation as resolved.
Show resolved Hide resolved
):
return dar.from_array(
array,
meta=np.array([], dtype=array.dtype),
chunks=max(1, len(array) // npartitions),
)

def __construct_graph(
self,
edge_info: Dict[Tuple[str, str, str], List[TensorType]],
Expand Down Expand Up @@ -464,32 +474,42 @@ def __construct_graph(
]
)

df = pandas.DataFrame(
{
"src": pandas.Series(na_dst)
if order == "CSC"
else pandas.Series(na_src),
"dst": pandas.Series(na_src)
if order == "CSC"
else pandas.Series(na_dst),
"etp": pandas.Series(na_etp),
}
)
vertex_dtype = df.src.dtype
vertex_dtype = na_src.dtype

if multi_gpu:
nworkers = len(distributed.get_client().scheduler_info()["workers"])
df = dd.from_pandas(df, npartitions=nworkers if len(df) > 32 else 1)
client = distributed.get_client()
nworkers = len(client.scheduler_info()["workers"])
npartitions = nworkers * 4

src_dar = self.__dask_array_from_numpy(na_src, client, npartitions)
del na_src

dst_dar = self.__dask_array_from_numpy(na_dst, client, npartitions)
del na_dst

etp_dar = self.__dask_array_from_numpy(na_etp, client, npartitions)
del na_etp

df = dd.from_dask_array(etp_dar, columns=["etp"])
df["src"] = dst_dar if order == "CSC" else src_dar
df["dst"] = src_dar if order == "CSC" else dst_dar

del src_dar
del dst_dar
del etp_dar

if df.etp.dtype != "int32":
raise ValueError("Edge type must be int32!")
tingyu66 marked this conversation as resolved.
Show resolved Hide resolved

# Ensure the dataframe is constructed on each partition
# instead of adding additional synchronization head from potential
# host to device copies.
def get_empty_df():
return cudf.DataFrame(
{
"etp": cudf.Series([], dtype="int32"),
"src": cudf.Series([], dtype=vertex_dtype),
"dst": cudf.Series([], dtype=vertex_dtype),
"etp": cudf.Series([], dtype="int32"),
}
)

Expand All @@ -500,9 +520,23 @@ def get_empty_df():
if len(f) > 0
else get_empty_df(),
meta=get_empty_df(),
).reset_index(drop=True)
).reset_index(
drop=True
) # should be ok for dask
else:
df = cudf.from_pandas(df).reset_index(drop=True)
df = pandas.DataFrame(
{
"src": pandas.Series(na_dst)
if order == "CSC"
else pandas.Series(na_src),
"dst": pandas.Series(na_src)
if order == "CSC"
else pandas.Series(na_dst),
"etp": pandas.Series(na_etp),
}
)
df = cudf.from_pandas(df)
df.reset_index(drop=True, inplace=True)

graph = cugraph.MultiGraph(directed=True)
if multi_gpu:
Expand All @@ -521,6 +555,7 @@ def get_empty_df():
edge_type="etp",
)

del df
return graph

@property
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@

from cugraph_pyg.loader import CuGraphNeighborLoader
from cugraph_pyg.data import CuGraphStore

from cugraph.utilities.utils import import_optional, MissingModule

torch = import_optional("torch")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -386,3 +386,29 @@ def test_mg_frame_handle(graph, dask_client):
F, G, N = graph
cugraph_store = CuGraphStore(F, G, N, multi_gpu=True)
assert isinstance(cugraph_store._EXPERIMENTAL__CuGraphStore__graph._plc_graph, dict)


@pytest.mark.skipif(isinstance(torch, MissingModule), reason="torch not available")
def test_cugraph_loader_large_index(dask_client):
large_index = (
np.random.randint(0, 1_000_000, (100_000_000,)),
np.random.randint(0, 1_000_000, (100_000_000,)),
)

large_features = np.random.randint(0, 50, (1_000_000,))
F = cugraph.gnn.FeatureStore(backend="torch")
F.add_data(large_features, "N", "f")

store = CuGraphStore(
F,
{("N", "e", "N"): large_index},
{"N": 1_000_000},
multi_gpu=True,
)

graph = store._subgraph()
assert isinstance(graph, cugraph.Graph)

el = graph.view_edge_list().compute()
assert (el["src"].values_host - large_index[0]).sum() == 0
assert (el["dst"].values_host - large_index[1]).sum() == 0
Loading