Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[REVIEW]Fix out of index errors encountered with sampling on out of index samples #2825

Merged
merged 22 commits into from
Oct 20, 2022

Conversation

VibhuJawa
Copy link
Member

@VibhuJawa VibhuJawa commented Oct 19, 2022

THIS PR does the following

@VibhuJawa VibhuJawa added this to the 22.12 milestone Oct 19, 2022
@VibhuJawa VibhuJawa self-assigned this Oct 19, 2022
@VibhuJawa VibhuJawa added non-breaking Non-breaking change bug Something isn't working labels Oct 19, 2022
Comment on lines +21 to +25
class CuFeatureStorage:
"""
Storage for node/edge feature data.
"""

Copy link
Member Author

@VibhuJawa VibhuJawa Oct 19, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed out of the graph_store file, see below:

class CuFeatureStorage:
"""Storage for node/edge feature data.
Either subclassing this class or implementing the same set of interfaces
is fine. DGL simply uses duck-typing to implement its sampling pipeline.
"""
def __init__(
self, pg, columns, storage_type, backend_lib="torch", indices_offset=0
):
self.pg = pg
self.columns = columns
if backend_lib == "torch":
from torch.utils.dlpack import from_dlpack
elif backend_lib == "tf":
from tensorflow.experimental.dlpack import from_dlpack
elif backend_lib == "cupy":
from cupy import from_dlpack
else:
raise NotImplementedError(
"Only pytorch and tensorflow backends are currently supported"
)
if storage_type not in ["edge", "node"]:
raise NotImplementedError("Only edge and node storage is supported")
self.storage_type = storage_type
self.from_dlpack = from_dlpack
self.indices_offset = indices_offset
def fetch(self, indices, device=None, pin_memory=False, **kwargs):
"""Fetch the features of the given node/edge IDs to the
given device.
Parameters
----------
indices : Tensor
Node or edge IDs.
device : Device
Device context.
pin_memory :
Returns
-------
Tensor
Feature data stored in PyTorch Tensor.
"""
# Default implementation uses synchronous fetch.
indices = cp.asarray(indices)
if isinstance(self.pg, MGPropertyGraph):
# dask_cudf loc breaks if we provide cudf series/cupy array
# https://github.com/rapidsai/cudf/issues/11877
indices = indices.get()
else:
indices = cudf.Series(indices)
indices = indices + self.indices_offset
if self.storage_type == "node":
subset_df = self.pg.get_vertex_data(
vertex_ids=indices, columns=self.columns
)
else:
subset_df = self.pg.get_edge_data(edge_ids=indices, columns=self.columns)
subset_df = subset_df[self.columns]
if isinstance(subset_df, dask_cudf.DataFrame):
subset_df = subset_df.compute()
if len(subset_df) == 0:
raise ValueError(f"{indices=} not found in FeatureStorage")
cap = subset_df.to_dlpack()
tensor = self.from_dlpack(cap)
del cap
if device:
if not isinstance(tensor, cp.ndarray):
# Cant transfer to different device for cupy
tensor = tensor.to(device)
return tensor

Copy link
Member Author

@VibhuJawa VibhuJawa left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added comments to help reviewers on where each file is coming from

Comment on lines +17 to +22
def _update_feature_map(
pg_feature_map, feat_name_obj, contains_vector_features, columns
):
"""
Update the existing feature map `pg_feature_map` based on `feat_name_obj`
"""
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just moved from the graph_store file

def _update_feature_map(
pg_feature_map, feat_name_obj, contains_vector_features, columns
):
if contains_vector_features:
if feat_name_obj is None:
raise ValueError(
"feature name must be provided when wrapping"
+ " multiple columns under a single feature name"
+ " or a feature map"
)
if isinstance(feat_name_obj, str):
pg_feature_map[feat_name_obj] = columns
elif isinstance(feat_name_obj, dict):
covered_columns = []
for col in feat_name_obj.keys():
current_cols = feat_name_obj[col]
# Handle strings too
if isinstance(current_cols, str):
current_cols = [current_cols]
covered_columns = covered_columns + current_cols
if set(covered_columns) != set(columns):
raise ValueError(
f"All the columns {columns} not covered in {covered_columns} "
f"Please check the feature_map {feat_name_obj} provided"
)
for key, cols in feat_name_obj.items():
if isinstance(cols, str):
cols = [cols]
pg_feature_map[key] = cols
else:
raise ValueError(f"{feat_name_obj} should be str or dict")
else:
if feat_name_obj:
raise ValueError(
f"feat_name {feat_name_obj} is only valid when "
"wrapping multiple columns under feature names"
)
for col in columns:
pg_feature_map[col] = [col]

vid_n = PropertyGraph.vertex_col_name


def get_subgraph_and_src_range_from_edgelist(edge_list, is_mg, reverse_edges=False):
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just moved from this file:

def get_subgraph_from_edgelist(edge_list, is_mg, reverse_edges=False):
if reverse_edges:
edge_list = edge_list.rename(columns={src_n: dst_n, dst_n: src_n})
subgraph = cugraph.MultiGraph(directed=True)
if is_mg:
# FIXME: Can not switch to renumber = False
# For MNMG Algos
# Remove when https://github.com/rapidsai/cugraph/issues/2437
# lands
create_subgraph_f = subgraph.from_dask_cudf_edgelist
renumber = True
else:
# Note: We have to keep renumber = False
# to handle cases when the seed_nodes is not present in sugraph
create_subgraph_f = subgraph.from_cudf_edgelist
renumber = False
create_subgraph_f(
edge_list,
source=src_n,
destination=dst_n,
edge_attr=eid_n,
renumber=renumber,
# FIXME: renumber=False is not supported for MNMG algos
legacy_renum_only=True,
)
return subgraph

Comment on lines 103 to 133
def sample_single_sg(
sg,
sample_f,
start_list,
start_list_dtype,
start_list_range,
fanout,
with_replacement,
):
if isinstance(start_list, dict):
start_list = cudf.concat(list(start_list.values()))

# Uniform sampling fails when the dtype
# of the seed dtype is not same as the node dtype
start_list = start_list.astype(start_list_dtype)
# Filter start list by ranges
# https://github.com/rapidsai/cugraph/blob/branch-22.12/cpp/src/prims/per_v_random_select_transform_outgoing_e.cuh

start_list = start_list[
(start_list >= start_list_range[0]) & (start_list <= start_list_range[1])
]

sampled_df = sample_f(
sg,
start_list=start_list,
fanout_vals=[fanout],
with_replacement=with_replacement,
# FIXME: is_edge_ids=True does not seem to do anything
# issue https://github.com/rapidsai/cugraph/issues/2562
)
return sampled_df
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved from graph_store:

def sample_single_sg(
sg, sample_f, start_list, start_list_dtype, fanout, with_replacement
):
if isinstance(start_list, dict):
start_list = cudf.concat(list(start_list.values()))
# Uniform sampling fails when the dtype
# of the seed dtype is not same as the node dtype
start_list = start_list.astype(start_list_dtype)
sampled_df = sample_f(
sg,
start_list=start_list,
fanout_vals=[fanout],
with_replacement=with_replacement,
# FIXME: is_edge_ids=True does not seem to do anything
# issue: https://github.com/rapidsai/cugraph/issues/2562
)
return sampled_df

Comment on lines +152 to +164
def create_dlpack_d(d):
dlpack_d = {}
for k, df in d.items():
if len(df) == 0:
dlpack_d[k] = (None, None, None)
else:
dlpack_d[k] = (
df[src_n].to_dlpack(),
df[dst_n].to_dlpack(),
df[eid_n].to_dlpack(),
)

return dlpack_d
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved from cugraph_store and renamed to create_dlpack_d.

def return_dlpack_d(d):
dlpack_d = {}
for k, df in d.items():
if len(df) == 0:
dlpack_d[k] = (None, None, None)
else:
dlpack_d[k] = (
df[src_n].to_dlpack(),
df[dst_n].to_dlpack(),
df[eid_n].to_dlpack(),
)
return dlpack_d

Comment on lines +144 to +149
def _convert_can_etype_s_to_tup(canonical_etype_s):
src_type, etype, dst_type = canonical_etype_s.split(",")
src_type = src_type[2:-1]
dst_type = dst_type[2:-2]
etype = etype[2:-1]
return (src_type, etype, dst_type)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved from graph_store , see below:

def _convert_can_etype_s_to_tup(canonical_etype_s):
src_type, etype, dst_type = canonical_etype_s.split(",")
src_type = src_type[2:-1]
dst_type = dst_type[2:-2]
etype = etype[2:-1]
return (src_type, etype, dst_type)

Comment on lines +136 to +141
def _edge_types_contains_canonical_etype(can_etype, edge_types, edge_dir):
src_type, _, dst_type = can_etype
if edge_dir == "in":
return dst_type in edge_types
else:
return src_type in edge_types
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved from graph_store,

def _edge_types_contains_canonical_etype(can_etype, edge_types, edge_dir):
src_type, _, dst_type = can_etype
if edge_dir == "in":
return dst_type in edge_types
else:
return src_type in edge_types

Comment on lines +171 to +183
# FIXME: Remove after we have consistent naming
# https://github.com/rapidsai/cugraph/issues/2618
sg_columns = sg.edgelist.edgelist_df.columns
if "src" in sg_columns:
# src for single node graph
sg_node_dtype = sg.edgelist.edgelist_df["src"].dtype
elif src_n in sg_columns:
# _SRC_ for multi-node graphs
sg_node_dtype = sg.edgelist.edgelist_df[src_n].dtype
else:
raise ValueError(f"Source column {src_n} not found in the subgraph")

return sg_node_dtype
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved from cugraph_store and put in a function.

sg_columns = sg.edgelist.edgelist_df.columns
if "src" in sg_columns:
# src for single node graph
self._sg_node_dtype = sg.edgelist.edgelist_df["src"].dtype
elif src_n in sg_columns:
# _SRC_ for multi-node graphs
self._sg_node_dtype = sg.edgelist.edgelist_df[src_n].dtype
else:
raise ValueError(f"Source column {src_n} not found in the subgraph")

Comment on lines +64 to +72
def sample_multiple_sgs(
sgs,
sample_f,
start_list_d,
start_list_dtype,
edge_dir,
fanout,
with_replacement,
):
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved from graph_store:

def sample_multiple_sgs(
sgs,
sample_f,
start_list_d,
start_list_dtype,
edge_dir,
fanout,
with_replacement,
):
start_list_types = list(start_list_d.keys())
output_dfs = []
for can_etype, sg in sgs.items():
can_etype = _convert_can_etype_s_to_tup(can_etype)
if _edge_types_contains_canonical_etype(can_etype, start_list_types, edge_dir):
if edge_dir == "in":
subset_type = can_etype[2]
else:
subset_type = can_etype[0]
output = sample_single_sg(
sg,
sample_f,
start_list_d[subset_type],
start_list_dtype,
fanout,
with_replacement,
)
output_dfs.append(output)
if len(output_dfs) == 0:
empty_df = cudf.DataFrame({"sources": [], "destinations": [], "indices": []})
return empty_df.astype(cp.int32)
if isinstance(output_dfs[0], dask_cudf.DataFrame):
return dask_cudf.concat(output_dfs, ignore_index=True)
else:
return cudf.concat(output_dfs, ignore_index=True)

Comment on lines +103 to +111
def sample_single_sg(
sg,
sample_f,
start_list,
start_list_dtype,
start_list_range,
fanout,
with_replacement,
):
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved from:

def sample_single_sg(
sg, sample_f, start_list, start_list_dtype, fanout, with_replacement
):
if isinstance(start_list, dict):
start_list = cudf.concat(list(start_list.values()))
# Uniform sampling fails when the dtype
# of the seed dtype is not same as the node dtype
start_list = start_list.astype(start_list_dtype)
sampled_df = sample_f(
sg,
start_list=start_list,
fanout_vals=[fanout],
with_replacement=with_replacement,
# FIXME: is_edge_ids=True does not seem to do anything
# issue: https://github.com/rapidsai/cugraph/issues/2562
)
return sampled_df

@VibhuJawa VibhuJawa changed the title [WIP]This PR fixes out of index errors encountered when running sampling on out of index frames [WIP]Fix out of index errors encountered with sampling on out of index samples Oct 19, 2022
Comment on lines +118 to +124
# Filter start list by ranges
# to enure the seed is with in index values
# see below:
# https://github.com/rapidsai/cugraph/blob/branch-22.12/cpp/src/prims/per_v_random_select_transform_outgoing_e.cuh
start_list = start_list[
(start_list >= start_list_range[0]) & (start_list <= start_list_range[1])
]
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the logic change in this PR to fix : #2828 . Also see test added in this PR

@VibhuJawa VibhuJawa changed the title [WIP]Fix out of index errors encountered with sampling on out of index samples [REVIEW]Fix out of index errors encountered with sampling on out of index samples Oct 20, 2022
@VibhuJawa VibhuJawa marked this pull request as ready for review October 20, 2022 01:18
@VibhuJawa VibhuJawa requested a review from a team as a code owner October 20, 2022 01:18
Copy link
Contributor

@rlratzel rlratzel left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a couple of minor questions/suggestions.

VibhuJawa and others added 3 commits October 20, 2022 08:41
Co-authored-by: Rick Ratzel <3039903+rlratzel@users.noreply.github.com>
Co-authored-by: Rick Ratzel <3039903+rlratzel@users.noreply.github.com>
@rlratzel
Copy link
Contributor

@gpucibot merge

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working non-breaking Non-breaking change
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants