diff --git a/analytical_engine/core/fragment/arrow_projected_fragment.h b/analytical_engine/core/fragment/arrow_projected_fragment.h index 8a1e2f368110..8622453ade99 100644 --- a/analytical_engine/core/fragment/arrow_projected_fragment.h +++ b/analytical_engine/core/fragment/arrow_projected_fragment.h @@ -1708,7 +1708,7 @@ class ArrowProjectedFragment const int64_t* offset_values = offsets->raw_values(); const int64_t* boffset_values = boffsets->raw_values(); vineyard::parallel_for( - static_cast(0), fragment->tvnums_[v_label], + static_cast(0), fragment->ivnums_[v_label], [&](vid_t i) { int64_t begin = offset_values[i], end = offset_values[i + 1]; int64_t bbegin = boffset_values[i], bend = boffset_values[i + 1]; diff --git a/analytical_engine/frame/property_graph_frame.cc b/analytical_engine/frame/property_graph_frame.cc index 6ca408c963df..e5a62bafcc40 100644 --- a/analytical_engine/frame/property_graph_frame.cc +++ b/analytical_engine/frame/property_graph_frame.cc @@ -112,8 +112,6 @@ LoadGraph(const grape::CommSpec& comm_spec, vineyard::Client& client, #ifdef ENABLE_GAR BOOST_LEAF_AUTO(graph_info_path, params.Get(gs::rpc::GRAPH_INFO_PATH)); - BOOST_LEAF_ASSIGN(generate_eid, params.Get(gs::rpc::GENERATE_EID)); - BOOST_LEAF_ASSIGN(retain_oid, params.Get(gs::rpc::RETAIN_OID)); using loader_t = vineyard::gar_fragment_loader_t; loader_t loader(client, comm_spec, graph_info_path); diff --git a/coordinator/gscoordinator/op_executor.py b/coordinator/gscoordinator/op_executor.py index 7b43a1084760..6d457f87d304 100644 --- a/coordinator/gscoordinator/op_executor.py +++ b/coordinator/gscoordinator/op_executor.py @@ -726,6 +726,7 @@ def _process_serialize_graph(self, op: op_def_pb2.OpDef): "\n" ) storage_options = json.loads(op.attr[types_pb2.STORAGE_OPTIONS].s.decode()) + serialization_options = json.loads(op.attr[types_pb2.SERIALIZATION_OPTIONS].s.decode()) vineyard_endpoint = self._launcher.vineyard_endpoint vineyard_ipc_socket = self._launcher.vineyard_socket deployment, hosts = self._launcher.get_vineyard_stream_info() @@ -738,6 +739,7 @@ def _process_serialize_graph(self, op: op_def_pb2.OpDef): vineyard_ipc_socket=vineyard_ipc_socket, vineyard_endpoint=vineyard_endpoint, storage_options=storage_options, + serialization_options=serialization_options, deployment=deployment, hosts=hosts, ) @@ -758,6 +760,7 @@ def _process_deserialize_graph(self, op: op_def_pb2.OpDef): "\n" ) storage_options = json.loads(op.attr[types_pb2.STORAGE_OPTIONS].s.decode()) + deseralization_options = json.loads(op.attr[types_pb2.DESERIALIZATION_OPTIONS].s.decode()) vineyard_endpoint = self._launcher.vineyard_endpoint vineyard_ipc_socket = self._launcher.vineyard_socket deployment, hosts = self._launcher.get_vineyard_stream_info() @@ -768,6 +771,7 @@ def _process_deserialize_graph(self, op: op_def_pb2.OpDef): vineyard_ipc_socket=vineyard_ipc_socket, vineyard_endpoint=vineyard_endpoint, storage_options=storage_options, + deseralization_options=deseralization_options, deployment=deployment, hosts=hosts, ) diff --git a/proto/types.proto b/proto/types.proto index 82d9f69e1811..b99d940ead74 100644 --- a/proto/types.proto +++ b/proto/types.proto @@ -249,6 +249,8 @@ enum ParamKey { FD = 323; // file descriptor SOURCE = 324; WRITE_OPTIONS = 325; + SERIALIZATION_OPTIONS = 326; + DESERIALIZATION_OPTIONS = 327; // large attr CHUNK_NAME = 341; diff --git a/python/graphscope/__init__.py b/python/graphscope/__init__.py index a3ed3ea76005..8af5851674ed 100644 --- a/python/graphscope/__init__.py +++ b/python/graphscope/__init__.py @@ -50,7 +50,6 @@ from graphscope.framework.errors import * from graphscope.framework.graph import Graph from graphscope.framework.graph_builder import load_from -from graphscope.framework.graph_builder import load_from_gar from graphscope.version import __version__ __doc__ = """ diff --git a/python/graphscope/client/session.py b/python/graphscope/client/session.py index 56e7b8faf26a..7fc6f23979bc 100755 --- a/python/graphscope/client/session.py +++ b/python/graphscope/client/session.py @@ -1162,12 +1162,6 @@ def load_from(self, *args, **kwargs): with default_session(self): return graphscope.load_from(*args, **kwargs) - def load_from_gar(self, *args, **kwargs): - """Load a graph from gar format files within the session. - See more information in :meth:`graphscope.load_from_gar`. - """ - with default_session(self): - return graphscope.load_from_gar(*args, **kwargs) @deprecated("Please use `sess.interactive` instead.") def gremlin(self, graph, params=None): diff --git a/python/graphscope/dataset/ldbc.py b/python/graphscope/dataset/ldbc.py index fa91e5541352..f9e345303cc9 100644 --- a/python/graphscope/dataset/ldbc.py +++ b/python/graphscope/dataset/ldbc.py @@ -81,22 +81,6 @@ def load_ldbc(sess=None, prefix=None, directed=True): ["creationDate", "locationIP", "browserUsed", "content", "length"], "id", ), - "organisation": ( - Loader( - os.path.join(prefix, "organisation_0_0.csv"), - header_row=True, - delimiter="|", - ), - ["type", "name", "url"], - "id", - ), - "tagclass": ( - Loader( - os.path.join(prefix, "tagclass_0_0.csv"), header_row=True, delimiter="|" - ), - ["name", "url"], - "id", - ), "person": ( Loader( os.path.join(prefix, "person_0_0.csv"), header_row=True, delimiter="|" @@ -112,20 +96,6 @@ def load_ldbc(sess=None, prefix=None, directed=True): ], "id", ), - "forum": ( - Loader( - os.path.join(prefix, "forum_0_0.csv"), header_row=True, delimiter="|" - ), - ["title", "creationDate"], - "id", - ), - "place": ( - Loader( - os.path.join(prefix, "place_0_0.csv"), header_row=True, delimiter="|" - ), - ["name", "url", "type"], - "id", - ), "post": ( Loader( os.path.join(prefix, "post_0_0.csv"), header_row=True, delimiter="|" @@ -141,11 +111,6 @@ def load_ldbc(sess=None, prefix=None, directed=True): ], "id", ), - "tag": ( - Loader(os.path.join(prefix, "tag_0_0.csv"), header_row=True, delimiter="|"), - ["name", "url"], - "id", - ), } edges = { "replyOf": [ @@ -170,62 +135,6 @@ def load_ldbc(sess=None, prefix=None, directed=True): ("Post.id", "post"), ), ], - "isPartOf": [ - ( - Loader( - os.path.join(prefix, "place_isPartOf_place_0_0.csv"), - header_row=True, - delimiter="|", - ), - [], - ("Place.id", "place"), - ("Place.id.1", "place"), - ) - ], - "isSubclassOf": [ - ( - Loader( - os.path.join(prefix, "tagclass_isSubclassOf_tagclass_0_0.csv"), - header_row=True, - delimiter="|", - ), - [], - ("TagClass.id", "tagclass"), - ("TagClass.id.1", "tagclass"), - ) - ], - "hasTag": [ - ( - Loader( - os.path.join(prefix, "forum_hasTag_tag_0_0.csv"), - header_row=True, - delimiter="|", - ), - [], - ("Forum.id", "forum"), - ("Tag.id", "tag"), - ), - ( - Loader( - os.path.join(prefix, "comment_hasTag_tag_0_0.csv"), - header_row=True, - delimiter="|", - ), - [], - ("Comment.id", "comment"), - ("Tag.id", "tag"), - ), - ( - Loader( - os.path.join(prefix, "post_hasTag_tag_0_0.csv"), - header_row=True, - delimiter="|", - ), - [], - ("Post.id", "post"), - ("Tag.id", "tag"), - ), - ], "knows": [ ( Loader( @@ -238,84 +147,6 @@ def load_ldbc(sess=None, prefix=None, directed=True): ("Person.id.1", "person"), ) ], - "hasModerator": [ - ( - Loader( - os.path.join(prefix, "forum_hasModerator_person_0_0.csv"), - header_row=True, - delimiter="|", - ), - [], - ("Forum.id", "forum"), - ("Person.id", "person"), - ) - ], - "hasInterest": [ - ( - Loader( - os.path.join(prefix, "person_hasInterest_tag_0_0.csv"), - header_row=True, - delimiter="|", - ), - [], - ("Person.id", "person"), - ("Tag.id", "tag"), - ) - ], - "isLocatedIn": [ - ( - Loader( - os.path.join(prefix, "post_isLocatedIn_place_0_0.csv"), - header_row=True, - delimiter="|", - ), - [], - ("Post.id", "post"), - ("Place.id", "place"), - ), - ( - Loader( - os.path.join(prefix, "comment_isLocatedIn_place_0_0.csv"), - header_row=True, - delimiter="|", - ), - [], - ("Comment.id", "comment"), - ("Place.id", "place"), - ), - ( - Loader( - os.path.join(prefix, "organisation_isLocatedIn_place_0_0.csv"), - header_row=True, - delimiter="|", - ), - [], - ("Organisation.id", "organisation"), - ("Place.id", "place"), - ), - ( - Loader( - os.path.join(prefix, "person_isLocatedIn_place_0_0.csv"), - header_row=True, - delimiter="|", - ), - [], - ("Person.id", "person"), - ("Place.id", "place"), - ), - ], - "hasType": [ - ( - Loader( - os.path.join(prefix, "tag_hasType_tagclass_0_0.csv"), - header_row=True, - delimiter="|", - ), - [], - ("Tag.id", "tag"), - ("TagClass.id", "tagclass"), - ) - ], "hasCreator": [ ( Loader( @@ -338,42 +169,6 @@ def load_ldbc(sess=None, prefix=None, directed=True): ("Person.id", "person"), ), ], - "containerOf": [ - ( - Loader( - os.path.join(prefix, "forum_containerOf_post_0_0.csv"), - header_row=True, - delimiter="|", - ), - [], - ("Forum.id", "forum"), - ("Post.id", "post"), - ) - ], - "hasMember": [ - ( - Loader( - os.path.join(prefix, "forum_hasMember_person_0_0.csv"), - header_row=True, - delimiter="|", - ), - ["joinDate"], - ("Forum.id", "forum"), - ("Person.id", "person"), - ) - ], - "workAt": [ - ( - Loader( - os.path.join(prefix, "person_workAt_organisation_0_0.csv"), - header_row=True, - delimiter="|", - ), - ["workFrom"], - ("Person.id", "person"), - ("Organisation.id", "organisation"), - ) - ], "likes": [ ( Loader( @@ -396,17 +191,5 @@ def load_ldbc(sess=None, prefix=None, directed=True): ("Post.id", "post"), ), ], - "studyAt": [ - ( - Loader( - os.path.join(prefix, "person_studyAt_organisation_0_0.csv"), - header_row=True, - delimiter="|", - ), - ["classYear"], - ("Person.id", "person"), - ("Organisation.id", "organisation"), - ) - ], } return sess.load_from(edges, vertices, directed, generate_eid=True, retain_oid=True) diff --git a/python/graphscope/framework/dag_utils.py b/python/graphscope/framework/dag_utils.py index f8fe467a516e..e3fa067693bf 100644 --- a/python/graphscope/framework/dag_utils.py +++ b/python/graphscope/framework/dag_utils.py @@ -1081,7 +1081,7 @@ def gremlin_to_subgraph( return op -def archive_graph(graph, path): +def save_to_graphar(graph, path): """Archive a graph to gar format with a path. Args: @@ -1110,25 +1110,30 @@ def archive_graph(graph, path): return op -def save_graph_to( - graph, - path: str, - vineyard_id, - **kwargs, +def serialize_graph( + graph, path: str, storage_options: dict, serialization_options: dict ): """Serialize graph to the specified location + The meta and data of graph is dumped to specified location, + and can be restored by `Graph.load_from` in other sessions. + Each worker will write a `path_{worker_id}.meta` file and + a `path_{worker_id}` file to storage. Args: graph (:class:`graphscope.framework.graph.GraphDAGNode`): Source graph. - path (str): The path to serialize the graph, on each worker. + path (str): The path to serialize the graph, on each worker, supported + storages are local, hdfs, oss, s3 Returns: An op to serialize the graph to a path. """ config = { types_pb2.GRAPH_SERIALIZATION_PATH: utils.s_to_attr(path), - types_pb2.VINEYARD_ID: utils.i_to_attr(vineyard_id), - types_pb2.STORAGE_OPTIONS: utils.s_to_attr(json.dumps(kwargs)), + types_pb2.VINEYARD_ID: utils.i_to_attr(graph._vineyard_id), + types_pb2.STORAGE_OPTIONS: utils.s_to_attr(json.dumps(storage_options)), + types_pb2.SERIALIZATION_OPTIONS: utils.s_to_attr( + json.dumps(serialization_options) + ), } op = Operation( graph.session_id, @@ -1140,10 +1145,26 @@ def save_graph_to( return op -def load_graph_from(path: str, sess, **kwargs): +def deserialize_graph( + path: str, sess, storage_options: dict, deserialization_options: dict +): + """Deserialize graph from the specified location. + + Args: + path (str): The path contains the serialization files. + sess (`graphscope.Session`): The target session + that the graph will be construct in. + + Returns: + `Graph`: A new graph object. Schema and data is supposed to be + identical with the one that called serialized method. + """ config = { types_pb2.GRAPH_SERIALIZATION_PATH: utils.s_to_attr(path), - types_pb2.STORAGE_OPTIONS: utils.s_to_attr(json.dumps(kwargs)), + types_pb2.STORAGE_OPTIONS: utils.s_to_attr(json.dumps(storage_options)), + types_pb2.DESERIALIZATION_OPTIONS: utils.s_to_attr( + json.dumps(deserialization_options) + ), } op = Operation( sess.session_id, diff --git a/python/graphscope/framework/graph.py b/python/graphscope/framework/graph.py index 281ca3ddf54d..fc763f08038f 100644 --- a/python/graphscope/framework/graph.py +++ b/python/graphscope/framework/graph.py @@ -28,6 +28,7 @@ from typing import Mapping from typing import Tuple from typing import Union +from urllib.parse import urlparse try: import vineyard @@ -116,9 +117,6 @@ def save_to(self, path, **kwargs): def load_from(cls, path, sess, **kwargs): raise NotImplementedError - def archive(self, path, **kwargs): - raise NotImplementedError - @abstractmethod def project(self, vertices, edges): raise NotImplementedError @@ -457,16 +455,6 @@ def to_dataframe(self, selector, vertex_range=None): op = dag_utils.graph_to_dataframe(self, selector, vertex_range) return ResultDAGNode(self, op) - def archive(self, path): - """Archive the graph to gar format with graph yaml file path. - - Args: - path (str): The graph yaml file path describe how to archive the graph. - """ - check_argument(self.graph_type == graph_def_pb2.ARROW_PROPERTY) - op = dag_utils.archive_graph(self, path) - return ArchivedGraph(self._session, op) - def to_directed(self): op = dag_utils.to_directed(self) graph_dag_node = GraphDAGNode(self._session, op) @@ -1164,49 +1152,102 @@ def _check_unmodified(self): self.signature == self._saved_signature, "Graph has been modified!" ) - def save_to(self, path, **kwargs): - """Serialize graph to a location. - The meta and data of graph is dumped to specified location, - and can be restored by `Graph.load_from` in other sessions. - - Each worker will write a `path_{worker_id}.meta` file and - a `path_{worker_id}` file to storage. - Args: - path (str): supported storages are local, hdfs, oss, s3 - """ - - op = dag_utils.save_graph_to(self, path, self._vineyard_id, **kwargs) - self._session.dag.add_op(op) - return self._session._wrapper(op) + @staticmethod + def _load_from_graphar(path, sess, **kwargs): + # graphar now only support global vertex map. + vertex_map = utils.vertex_map_type_to_enum("global") + oid_type = utils.get_oid_type_from_graph_info(path) + config = { + types_pb2.OID_TYPE: utils.s_to_attr( + oid_type + ), # grahar use vertex index as oid, so it always be int64_t + types_pb2.VID_TYPE: utils.s_to_attr("uint64_t"), + types_pb2.IS_FROM_VINEYARD_ID: utils.b_to_attr(False), + types_pb2.IS_FROM_GAR: utils.b_to_attr(True), + types_pb2.VERTEX_MAP_TYPE: utils.i_to_attr(vertex_map), + types_pb2.COMPACT_EDGES: utils.b_to_attr(False), + types_pb2.GRAPH_INFO_PATH: utils.s_to_attr(path), + } + op = dag_utils.create_graph( + sess.session_id, graph_def_pb2.ARROW_PROPERTY, inputs=[], attrs=config + ) + return sess._wrapper(GraphDAGNode(sess, op)) @classmethod - def load_from(cls, path, sess, **kwargs): - """Construct a `Graph` by deserialize from `path`. - It will read all serialization files, which is dumped by - `Graph.serialize`. - If any serialize file doesn't exists or broken, will error out. + def load_from(cls, uristring, sess=None, **kwargs): + """Load a ArrowProperty graph from with a certain data source. The data source + can be vineyard serialized files or graphar files. Args: - path (str): Path contains the serialization files. - sess (`graphscope.Session`): The target session - that the graph will be construct in - + uristring (str): URI contains the description of the data source or + path contains the serialization files, + example: "graphar+file:///tmp/graphar/xxx" + sess (`graphscope.Session`): The target session that the graph + will be construct, if None, use the default session. + kwargs: Other arguments that will be passed to the data source loader. Returns: - `Graph`: A new graph object. Schema and data is supposed to be - identical with the one that called serialized method. + `Graph`: A new graph object. """ - op = dag_utils.load_graph_from(path, sess, **kwargs) - return sess._wrapper(GraphDAGNode(sess, op)) + from graphscope.client.session import get_default_session + + if sess is None: + sess = get_default_session() + uri = urlparse(uristring) + if uri.scheme and "+" in uri.scheme: + source = uri.scheme.split("+")[0] + path = uri.scheme.split("+")[-1] + "://" + uri.netloc + uri.path + if source == "graphar": + return cls._load_from_graphar(path, sess) + else: + raise ValueError("Unknown source: %s" % source) + else: + # not a uri string, assume it is a path for deserialization + storage_options = kwargs.pop("storage_options", {}) + deserialization_options = kwargs.pop("deserialization_options", {}) + op = dag_utils.deserialize_graph( + uristring, sess, storage_options, deserialization_options + ) + return sess._wrapper(GraphDAGNode(sess, op)) - def archive(self, path): - """Archive graph gar format files base on the graph info. - The meta and data of graph is dumped to specified location, - and can be restored by `Graph.deserialize` in other sessions. + def save_to( + self, + path, + format="serialization", + **kwargs, + ): + """Save graph to specified location with specified format. Args: - path (str): the graph info file path. + path (str): the directory path to write graph. + format (str): the format to write graph, default is "serialization". + kwargs: Other arguments that will be passed to the data source + saver. + + Return (dict): A dict contains the type and uri string of output data. """ - return self._session._wrapper(self._graph_node.archive(path)) + if format == "graphar": + graphar_options = kwargs.pop("graphar_options", {}) + graph_info_path = utils.generate_graphar_info_from_schema( + path, + self._schema, + graphar_options, + ) + op = dag_utils.save_to_graphar(self, graph_info_path) + self._session.dag.add_op(op) + self._session._wrapper(op) + return {"type": format, "uri": "graphar+" + graph_info_path} + elif format == "serialization": + # serialize graph + storage_options = kwargs.pop("storage_options", {}) + serialization_options = kwargs.pop("serialization_options", {}) + op = dag_utils.serialize_graph( + self, path, storage_options, serialization_options + ) + self._session.dag.add_op(op) + self._session._wrapper(op) + return {"type": format, "uri": path} + else: + raise ValueError("Unknown format: %s" % format) @apply_docstring(GraphDAGNode.add_vertices) def add_vertices( @@ -1299,14 +1340,3 @@ def __init__(self, session, op): self._op = op # add op to dag self._session.dag.add_op(self._op) - - -class ArchivedGraph(DAGNode): - """Archived graph node in a DAG""" - - def __init__(self, session, op): - super().__init__() - self._session = session - self._op = op - # add op to dag - self._session.dag.add_op(self._op) diff --git a/python/graphscope/framework/graph_builder.py b/python/graphscope/framework/graph_builder.py index fc1a6e4cb07b..3a3245daa288 100644 --- a/python/graphscope/framework/graph_builder.py +++ b/python/graphscope/framework/graph_builder.py @@ -217,58 +217,3 @@ def load_from( use_perfect_hash=use_perfect_hash, ) return graph - - -def load_from_gar( - graph_info_path: str, - directed=True, - oid_type="int64_t", - vid_type="uint64_t", - vertex_map="global", - compact_edges=False, - use_perfect_hash=False, -) -> Graph: - sess = get_default_session() - oid_type = utils.normalize_data_type_str(oid_type) - if oid_type not in ("int32_t", "int64_t", "std::string"): - raise ValueError("The 'oid_type' can only be int32_t, int64_t or string.") - vid_type = utils.normalize_data_type_str(vid_type) - if vid_type not in ("uint32_t", "uint64_t"): - raise ValueError("The 'vid_type' can only be uint32_t or uint64_t.") - if compact_edges: - raise ValueError( - "Loading from gar with 'compact_edges' hasn't been supported yet." - ) - if use_perfect_hash: - raise ValueError( - "Loading from gar with 'use_perfect_hash' hasn't been supported yet." - ) - # generate and add a loader op to dag - vertex_map = utils.vertex_map_type_to_enum(vertex_map) - # construct create graph op - config = { - types_pb2.DIRECTED: utils.b_to_attr(directed), - types_pb2.OID_TYPE: utils.s_to_attr(oid_type), - types_pb2.VID_TYPE: utils.s_to_attr(vid_type), - types_pb2.GENERATE_EID: utils.b_to_attr(False), - types_pb2.RETAIN_OID: utils.b_to_attr(False), - types_pb2.IS_FROM_VINEYARD_ID: utils.b_to_attr(False), - types_pb2.IS_FROM_GAR: utils.b_to_attr(True), - types_pb2.VERTEX_MAP_TYPE: utils.i_to_attr(vertex_map), - types_pb2.COMPACT_EDGES: utils.b_to_attr(compact_edges), - types_pb2.USE_PERFECT_HASH: utils.b_to_attr(use_perfect_hash), - types_pb2.GRAPH_INFO_PATH: utils.s_to_attr(graph_info_path), - } - op = dag_utils.create_graph( - sess.session_id, graph_def_pb2.ARROW_PROPERTY, inputs=[], attrs=config - ) - graph = sess.g( - op, - oid_type=oid_type, - vid_type=vid_type, - directed=directed, - vertex_map=vertex_map, - compact_edges=compact_edges, - use_perfect_hash=use_perfect_hash, - ) - return graph diff --git a/python/graphscope/framework/utils.py b/python/graphscope/framework/utils.py index 94214241b22b..07c1c444619a 100644 --- a/python/graphscope/framework/utils.py +++ b/python/graphscope/framework/utils.py @@ -31,10 +31,12 @@ import warnings from queue import Empty from queue import Queue +from urllib.parse import urlparse import numpy as np import pandas as pd import psutil +import yaml from google.protobuf.any_pb2 import Any from graphscope.client.archive import OutArchive @@ -702,3 +704,144 @@ def decorator(func): return func return decorator + +def generate_graphar_info_from_schema(path, schema, graphar_options): + import copy + + class Dumper(yaml.Dumper): + def increase_indent(self, flow=False, indentless=False): + return super(Dumper, self).increase_indent(flow, False) + + def PbDataType2InfoType(str): + if str == "INT": + return "int32" + elif str == "LONG": + return "int64" + elif str == "STRING": + return "string" + elif str == "BOOL": + return "bool" + else: + raise ValueError("Invlid type name {}".format(str)) + + # if not urlparse(path).scheme: + # path = "file://" + path + graph_name = graphar_options.get("graph_name", "graph") + vertex_block_size = graphar_options.get("vertex_block_size", 262144) # 2^18 + edge_block_size = graphar_options.get("edge_block_size", 4194304) # 2^22 + file_type = graphar_options.get("file_type", "parquet") + version = graphar_options.get("version", "v1") + graph_info = dict() + graph_info["name"] = graph_name + graph_info["version"] = "gar/{}".format(version) + # process vertex info + graph_info["vertices"] = [] + graph_info["edges"] = [] + for vertex_label in schema.vertex_labels: + info = dict() + info["label"] = vertex_label + info["chunk_size"] = vertex_block_size + info["prefix"] = "vertex/" + vertex_label + "/" + info["version"] = "gar/{}".format(version) + info["property_groups"] = [{"properties": [], "file_type": file_type}] + for property in schema.get_vertex_properties(vertex_label): + info["property_groups"][0]["properties"].append( + { + "name": property.name, + "data_type": PbDataType2InfoType( + graph_def_pb2.DataTypePb.Name(property.data_type) + ), + "is_primary": True if property.name == "id" else False, + } + ) + output_path = os.path.join(path, vertex_label + ".vertex.yml") + with open(output_path, "w") as f: + yaml.dump(info, f, Dumper=Dumper, default_flow_style=False) + graph_info["vertices"].append(vertex_label + ".vertex.yml") + # process edge info + for edge_label in schema.edge_labels: + properties = [] + for property in schema.get_edge_properties(edge_label): + properties.append( + { + "name": property.name, + "data_type": PbDataType2InfoType( + graph_def_pb2.DataTypePb.Name(property.data_type) + ), + "is_primary": False, + } + ) + csr_adj_list = { + "file_type": file_type, + "property_groups": [ + {"properties": copy.deepcopy(properties), "file_type": file_type} + ], + "ordered": True, + "aligned_by": "src", + } + csc_adj_list = { + "file_type": file_type, + "property_groups": [ + {"properties": copy.deepcopy(properties), "file_type": file_type} + ], + "ordered": True, + "aligned_by": "dst", + } + for r in schema.get_relationships(edge_label): + info = dict() + info["prefix"] = ( + "edge/" + r.source + "_" + edge_label + "_" + r.destination + "/" + ) + info["edge_label"] = edge_label + info["src_label"] = r.source + info["dst_label"] = r.destination + info["chunk_size"] = edge_block_size + info["src_chunk_size"] = vertex_block_size + info["dst_chunk_size"] = vertex_block_size + info["version"] = "gar/{}".format(version) + info["adj_lists"] = [csr_adj_list, csc_adj_list] + output_path = os.path.join( + path, r.source + "_" + edge_label + "_" + r.destination + ".edge.yml" + ) + with open(output_path, "w") as f: + yaml.dump(info, f, Dumper=Dumper, default_flow_style=False) + graph_info["edges"].append( + r.source + "_" + edge_label + "_" + r.destination + ".edge.yml" + ) + graph_info_path = os.path.join(path, graph_name + ".graph.yml") + with open(graph_info_path, "w") as f: + yaml.dump(graph_info, f, Dumper=Dumper, default_flow_style=False) + return graph_info_path + +def get_oid_type_from_graph_info(path): + if "file://" in path: + path = path.replace("file://", "") + with open(path, "r") as f: + graph_info = yaml.safe_load(f) + if "vertices" not in graph_info: + raise ValueError("Invalid graph info file, no vertices found.") + vertex_info_path = graph_info["vertices"][0] + if "prefix" in graph_info: + prefix = graph_info["prefix"] + else: + prefix = os.path.dirname(path) + with open(os.path.join(prefix, vertex_info_path), "r") as f: + vertex_info = yaml.safe_load(f) + property_groups = vertex_info["property_groups"] + if len(property_groups) == 0: + raise ValueError("Invalid vertex info file, no property groups found.") + data_type = None + for property_group in property_groups: + properties = property_group["properties"] + if len(properties) == 0: + raise ValueError("Invalid vertex info file, no properties found.") + for property in properties: + if property["is_primary"]: + data_type = property["data_type"] + break + if data_type == "int64": + return "int64_t" + elif data_type == "string": + return "std::string" + else: + raise ValueError("Invalid vertex info file, primary key is not int64 or string.") diff --git a/python/graphscope/tests/unittest/test_gar.py b/python/graphscope/tests/unittest/test_gar.py deleted file mode 100644 index 7a2281bd76da..000000000000 --- a/python/graphscope/tests/unittest/test_gar.py +++ /dev/null @@ -1,40 +0,0 @@ -#!/usr/bin/env python3 -# -*- coding: utf-8 -*- -# -# Copyright 2020 Alibaba Group Holding Limited. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -import os - -import pytest - -gar_test_repo_dir = os.path.expandvars("${GS_TEST_DIR}") - - -@pytest.mark.skip(reason="Issue 3162") -def test_load_from_gar(graphscope_session): - graph_yaml = os.path.join( - gar_test_repo_dir, "graphar/ldbc_sample/parquet/ldbc_sample.graph.yml" - ) - print(graph_yaml) - graph = graphscope_session.load_from_gar(graph_yaml) - assert graph.schema is not None - del graph - - -@pytest.mark.skip(reason="Issue 3162") -def test_archive_to_gar(ldbc_graph): - graph_yaml = os.path.join(gar_test_repo_dir, "graphar/ldbc/ldbc.graph.yml") - ldbc_graph.archive(graph_yaml) diff --git a/python/graphscope/tests/unittest/test_graphar.py b/python/graphscope/tests/unittest/test_graphar.py new file mode 100644 index 000000000000..7b59f9e60323 --- /dev/null +++ b/python/graphscope/tests/unittest/test_graphar.py @@ -0,0 +1,49 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +# +# Copyright 2020 Alibaba Group Holding Limited. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import os + +from graphscope.framework.graph import Graph +from graphscope import pagerank + +graphar_test_repo_dir = os.path.expandvars("${GS_TEST_DIR}") + + +def test_load_from_graphar(graphscope_session): + # graph_yaml = os.path.join( + # graphar_test_repo_dir, "graphar/ldbc_sample/parquet/ldbc_sample.graph.yml" + # ) + # graph_yaml_path = "graphar+file://" + graph_yaml + graph_yaml_path = "graphar+file:///tmp/graphar/ldbc_sample.graph.yml" + print(graph_yaml_path) + g = Graph.load_from(graph_yaml_path, graphscope_session) + ldbc_simple = g.project(vertices={"person": []}, edges={"knows": []}) + ctx = pagerank(ldbc_simple) + print(ctx.to_dataframe({"id": "v.id", "value": "r"})) + assert g.schema is not None + del g + + +def test_save_to_graphar(ldbc_graph): + graphar_options = { + "graph_name": "ldbc_sample", + "file_type": "parquet", + "vertex_block_size": 500, + "edge_block_size": 500, + } + ldbc_graph.save_to("/tmp/graphar/", format="graphar", graphar_options=graphar_options) diff --git a/python/setup.py b/python/setup.py index 44efa5dc197a..c4e75173c64f 100644 --- a/python/setup.py +++ b/python/setup.py @@ -404,7 +404,7 @@ def parse_version(root, **kwargs): package_dir=resolve_graphscope_package_dir(), packages=find_graphscope_packages(), package_data=parsed_package_data(), - ext_modules=build_learning_engine(), + # ext_modules=build_learning_engine(), cmdclass={ "build_ext": BuildGLExt, "build_gltorch_ext": BuildGLTorchExt,