Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Python][Client] Unify the graph level load_from and save to API #2919

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion analytical_engine/core/fragment/arrow_projected_fragment.h
Original file line number Diff line number Diff line change
Expand Up @@ -1708,7 +1708,7 @@ class ArrowProjectedFragment
const int64_t* offset_values = offsets->raw_values();
const int64_t* boffset_values = boffsets->raw_values();
vineyard::parallel_for(
static_cast<vid_t>(0), fragment->tvnums_[v_label],
static_cast<vid_t>(0), fragment->ivnums_[v_label],
[&](vid_t i) {
int64_t begin = offset_values[i], end = offset_values[i + 1];
int64_t bbegin = boffset_values[i], bend = boffset_values[i + 1];
Expand Down
2 changes: 0 additions & 2 deletions analytical_engine/frame/property_graph_frame.cc
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,6 @@ LoadGraph(const grape::CommSpec& comm_spec, vineyard::Client& client,
#ifdef ENABLE_GAR
BOOST_LEAF_AUTO(graph_info_path,
params.Get<std::string>(gs::rpc::GRAPH_INFO_PATH));
BOOST_LEAF_ASSIGN(generate_eid, params.Get<bool>(gs::rpc::GENERATE_EID));
BOOST_LEAF_ASSIGN(retain_oid, params.Get<bool>(gs::rpc::RETAIN_OID));
using loader_t =
vineyard::gar_fragment_loader_t<oid_t, vid_t, vertex_map_t>;
loader_t loader(client, comm_spec, graph_info_path);
Expand Down
4 changes: 4 additions & 0 deletions coordinator/gscoordinator/op_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -726,6 +726,7 @@ def _process_serialize_graph(self, op: op_def_pb2.OpDef):
"\n"
)
storage_options = json.loads(op.attr[types_pb2.STORAGE_OPTIONS].s.decode())
serialization_options = json.loads(op.attr[types_pb2.SERIALIZATION_OPTIONS].s.decode())
vineyard_endpoint = self._launcher.vineyard_endpoint
vineyard_ipc_socket = self._launcher.vineyard_socket
deployment, hosts = self._launcher.get_vineyard_stream_info()
Expand All @@ -738,6 +739,7 @@ def _process_serialize_graph(self, op: op_def_pb2.OpDef):
vineyard_ipc_socket=vineyard_ipc_socket,
vineyard_endpoint=vineyard_endpoint,
storage_options=storage_options,
serialization_options=serialization_options,
deployment=deployment,
hosts=hosts,
)
Expand All @@ -758,6 +760,7 @@ def _process_deserialize_graph(self, op: op_def_pb2.OpDef):
"\n"
)
storage_options = json.loads(op.attr[types_pb2.STORAGE_OPTIONS].s.decode())
deseralization_options = json.loads(op.attr[types_pb2.DESERIALIZATION_OPTIONS].s.decode())
vineyard_endpoint = self._launcher.vineyard_endpoint
vineyard_ipc_socket = self._launcher.vineyard_socket
deployment, hosts = self._launcher.get_vineyard_stream_info()
Expand All @@ -768,6 +771,7 @@ def _process_deserialize_graph(self, op: op_def_pb2.OpDef):
vineyard_ipc_socket=vineyard_ipc_socket,
vineyard_endpoint=vineyard_endpoint,
storage_options=storage_options,
deseralization_options=deseralization_options,
deployment=deployment,
hosts=hosts,
)
Expand Down
2 changes: 2 additions & 0 deletions proto/types.proto
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,8 @@ enum ParamKey {
FD = 323; // file descriptor
SOURCE = 324;
WRITE_OPTIONS = 325;
SERIALIZATION_OPTIONS = 326;
DESERIALIZATION_OPTIONS = 327;

// large attr
CHUNK_NAME = 341;
Expand Down
1 change: 0 additions & 1 deletion python/graphscope/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@
from graphscope.framework.errors import *
from graphscope.framework.graph import Graph
from graphscope.framework.graph_builder import load_from
from graphscope.framework.graph_builder import load_from_gar
from graphscope.version import __version__

__doc__ = """
Expand Down
6 changes: 0 additions & 6 deletions python/graphscope/client/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -1162,12 +1162,6 @@ def load_from(self, *args, **kwargs):
with default_session(self):
return graphscope.load_from(*args, **kwargs)

def load_from_gar(self, *args, **kwargs):
"""Load a graph from gar format files within the session.
See more information in :meth:`graphscope.load_from_gar`.
"""
with default_session(self):
return graphscope.load_from_gar(*args, **kwargs)

@deprecated("Please use `sess.interactive` instead.")
def gremlin(self, graph, params=None):
Expand Down
217 changes: 0 additions & 217 deletions python/graphscope/dataset/ldbc.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,22 +81,6 @@ def load_ldbc(sess=None, prefix=None, directed=True):
["creationDate", "locationIP", "browserUsed", "content", "length"],
"id",
),
"organisation": (
Loader(
os.path.join(prefix, "organisation_0_0.csv"),
header_row=True,
delimiter="|",
),
["type", "name", "url"],
"id",
),
"tagclass": (
Loader(
os.path.join(prefix, "tagclass_0_0.csv"), header_row=True, delimiter="|"
),
["name", "url"],
"id",
),
"person": (
Loader(
os.path.join(prefix, "person_0_0.csv"), header_row=True, delimiter="|"
Expand All @@ -112,20 +96,6 @@ def load_ldbc(sess=None, prefix=None, directed=True):
],
"id",
),
"forum": (
Loader(
os.path.join(prefix, "forum_0_0.csv"), header_row=True, delimiter="|"
),
["title", "creationDate"],
"id",
),
"place": (
Loader(
os.path.join(prefix, "place_0_0.csv"), header_row=True, delimiter="|"
),
["name", "url", "type"],
"id",
),
"post": (
Loader(
os.path.join(prefix, "post_0_0.csv"), header_row=True, delimiter="|"
Expand All @@ -141,11 +111,6 @@ def load_ldbc(sess=None, prefix=None, directed=True):
],
"id",
),
"tag": (
Loader(os.path.join(prefix, "tag_0_0.csv"), header_row=True, delimiter="|"),
["name", "url"],
"id",
),
}
edges = {
"replyOf": [
Expand All @@ -170,62 +135,6 @@ def load_ldbc(sess=None, prefix=None, directed=True):
("Post.id", "post"),
),
],
"isPartOf": [
(
Loader(
os.path.join(prefix, "place_isPartOf_place_0_0.csv"),
header_row=True,
delimiter="|",
),
[],
("Place.id", "place"),
("Place.id.1", "place"),
)
],
"isSubclassOf": [
(
Loader(
os.path.join(prefix, "tagclass_isSubclassOf_tagclass_0_0.csv"),
header_row=True,
delimiter="|",
),
[],
("TagClass.id", "tagclass"),
("TagClass.id.1", "tagclass"),
)
],
"hasTag": [
(
Loader(
os.path.join(prefix, "forum_hasTag_tag_0_0.csv"),
header_row=True,
delimiter="|",
),
[],
("Forum.id", "forum"),
("Tag.id", "tag"),
),
(
Loader(
os.path.join(prefix, "comment_hasTag_tag_0_0.csv"),
header_row=True,
delimiter="|",
),
[],
("Comment.id", "comment"),
("Tag.id", "tag"),
),
(
Loader(
os.path.join(prefix, "post_hasTag_tag_0_0.csv"),
header_row=True,
delimiter="|",
),
[],
("Post.id", "post"),
("Tag.id", "tag"),
),
],
"knows": [
(
Loader(
Expand All @@ -238,84 +147,6 @@ def load_ldbc(sess=None, prefix=None, directed=True):
("Person.id.1", "person"),
)
],
"hasModerator": [
(
Loader(
os.path.join(prefix, "forum_hasModerator_person_0_0.csv"),
header_row=True,
delimiter="|",
),
[],
("Forum.id", "forum"),
("Person.id", "person"),
)
],
"hasInterest": [
(
Loader(
os.path.join(prefix, "person_hasInterest_tag_0_0.csv"),
header_row=True,
delimiter="|",
),
[],
("Person.id", "person"),
("Tag.id", "tag"),
)
],
"isLocatedIn": [
(
Loader(
os.path.join(prefix, "post_isLocatedIn_place_0_0.csv"),
header_row=True,
delimiter="|",
),
[],
("Post.id", "post"),
("Place.id", "place"),
),
(
Loader(
os.path.join(prefix, "comment_isLocatedIn_place_0_0.csv"),
header_row=True,
delimiter="|",
),
[],
("Comment.id", "comment"),
("Place.id", "place"),
),
(
Loader(
os.path.join(prefix, "organisation_isLocatedIn_place_0_0.csv"),
header_row=True,
delimiter="|",
),
[],
("Organisation.id", "organisation"),
("Place.id", "place"),
),
(
Loader(
os.path.join(prefix, "person_isLocatedIn_place_0_0.csv"),
header_row=True,
delimiter="|",
),
[],
("Person.id", "person"),
("Place.id", "place"),
),
],
"hasType": [
(
Loader(
os.path.join(prefix, "tag_hasType_tagclass_0_0.csv"),
header_row=True,
delimiter="|",
),
[],
("Tag.id", "tag"),
("TagClass.id", "tagclass"),
)
],
"hasCreator": [
(
Loader(
Expand All @@ -338,42 +169,6 @@ def load_ldbc(sess=None, prefix=None, directed=True):
("Person.id", "person"),
),
],
"containerOf": [
(
Loader(
os.path.join(prefix, "forum_containerOf_post_0_0.csv"),
header_row=True,
delimiter="|",
),
[],
("Forum.id", "forum"),
("Post.id", "post"),
)
],
"hasMember": [
(
Loader(
os.path.join(prefix, "forum_hasMember_person_0_0.csv"),
header_row=True,
delimiter="|",
),
["joinDate"],
("Forum.id", "forum"),
("Person.id", "person"),
)
],
"workAt": [
(
Loader(
os.path.join(prefix, "person_workAt_organisation_0_0.csv"),
header_row=True,
delimiter="|",
),
["workFrom"],
("Person.id", "person"),
("Organisation.id", "organisation"),
)
],
"likes": [
(
Loader(
Expand All @@ -396,17 +191,5 @@ def load_ldbc(sess=None, prefix=None, directed=True):
("Post.id", "post"),
),
],
"studyAt": [
(
Loader(
os.path.join(prefix, "person_studyAt_organisation_0_0.csv"),
header_row=True,
delimiter="|",
),
["classYear"],
("Person.id", "person"),
("Organisation.id", "organisation"),
)
],
}
return sess.load_from(edges, vertices, directed, generate_eid=True, retain_oid=True)
Loading