Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixes the inconsistent usage of msgpack/json in graph reporter #2843

Merged
merged 1 commit into from
Jun 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions analytical_engine/core/fragment/dynamic_fragment.h
Original file line number Diff line number Diff line change
Expand Up @@ -1529,7 +1529,7 @@ class DynamicFragmentMutator {
v_fid = partitioner.GetPartitionId(oid);
if (modify_type == rpc::NX_ADD_NODES) {
vm_ptr_->AddVertex(std::move(oid), gid);
if (!v_data.Empty()) {
if (v_data.IsObject() && !v_data.GetObject().ObjectEmpty()) {
for (const auto& prop : v_data.GetObject()) {
if (!fragment_->schema_["vertex"].HasMember(prop.name)) {
dynamic::Value key(prop.name);
Expand Down Expand Up @@ -1606,7 +1606,7 @@ class DynamicFragmentMutator {
std::move(empty_data));
}
}
if (!e_data.Empty()) {
if (e_data.IsObject() && !e_data.GetObject().ObjectEmpty()) {
for (const auto& prop : e_data.GetObject()) {
if (!fragment_->schema_["edge"].HasMember(prop.name)) {
dynamic::Value key(prop.name);
Expand Down
9 changes: 6 additions & 3 deletions analytical_engine/core/fragment/fragment_reporter.h
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,6 @@ class DynamicFragmentReporter : public grape::Communicator {
}
break;
}

case rpc::HAS_EDGE: {
BOOST_LEAF_AUTO(edge_in_json, params.Get<std::string>(rpc::EDGE));
dynamic::Value edge;
Expand Down Expand Up @@ -761,7 +760,9 @@ class ArrowFragmentReporter<
}
}
// archive the start gid and nodes attribute array.
arc << gid << nodes_attr;
msgpack::sbuffer sbuf;
msgpack::pack(&sbuf, nodes_attr);
arc << gid << sbuf;
}
}

Expand Down Expand Up @@ -859,7 +860,9 @@ class ArrowFragmentReporter<
}
}
// archive the start gid and edges attributes array.
arc << gid << adj_list;
msgpack::sbuffer sbuf;
msgpack::pack(&sbuf, adj_list);
arc << gid << sbuf;
}
}

Expand Down
4 changes: 4 additions & 0 deletions python/graphscope/client/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -787,6 +787,10 @@ def info(self):
def closed(self):
return self._closed

@property
def disconnected(self):
return self._grpc_client is None or self._disconnected

def eager(self):
return self._config_params["mode"] == "eager"

Expand Down
7 changes: 5 additions & 2 deletions python/graphscope/framework/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,10 @@ class name.
# add op to dag
self._session.dag.add_op(self._op)

# statically create the unload op to prevent a possible segmentation fault
# inside the protobuf library.
self._unload_op = unload_app(self)

def __repr__(self):
s = f"graphscope.App <type: {self._app_assets.type}, algorithm: {self._app_assets.algo} "
s += f"bounded_graph: {str(self._graph)}>"
Expand Down Expand Up @@ -420,8 +424,7 @@ def _unload(self):
Returns:
:class:`graphscope.framework.app.UnloadedApp`: Evaluated in eager mode.
"""
op = unload_app(self)
return UnloadedApp(self._session, op)
return UnloadedApp(self._session, self._unload_op)


class App(object):
Expand Down
6 changes: 4 additions & 2 deletions python/graphscope/framework/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,9 @@ def __init__(self, bound_app, graph, *args, **kwargs):
self._op = run_app(self._bound_app, *args, **kwargs)
self._session.dag.add_op(self._op)

# statically create the unload op
self._unload_op = dag_utils.unload_context(self)

def _check_selector(self, selector):
raise NotImplementedError()

Expand Down Expand Up @@ -279,8 +282,7 @@ def __del__(self):
pass

def _unload(self):
op = dag_utils.unload_context(self)
return UnloadedContext(self._session, op)
return UnloadedContext(self._session, self._unload_op)


class TensorContextDAGNode(BaseContextDAGNode):
Expand Down
6 changes: 4 additions & 2 deletions python/graphscope/framework/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,9 @@ def __init__(
self._resolve_op(incoming_data)
self._session.dag.add_op(self._op)

# statically create the unload op
self._unload_op = dag_utils.unload_graph(self)

@property
def v_labels(self):
return self._v_labels
Expand Down Expand Up @@ -714,8 +717,7 @@ def _unload(self):
Returns:
:class:`graphscope.framework.graph.UnloadedGraph`: Evaluated in eager mode.
"""
op = dag_utils.unload_graph(self)
return UnloadedGraph(self._session, op)
return UnloadedGraph(self._session, self._unload_op)

def project(
self,
Expand Down
22 changes: 10 additions & 12 deletions python/graphscope/nx/classes/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,9 +113,8 @@ def get_pred_attr(self, n):
def align_node_attr_cache(self):
"""Check and align the node attr cache with node id cache"""
if self.enable_iter_cache and self.node_attr_align is False:
f = self.futures["node_attr"]
if f is not None:
start_gid, self.node_attr_cache = f.result()
if self.futures["node_attr"] is not None:
start_gid, self.node_attr_cache = self.futures["node_attr"].result()
if start_gid == self.iter_pre_gid:
# align to current node_id_cache
if self.iter_gid != self.iter_pre_gid:
Expand All @@ -129,8 +128,7 @@ def align_node_attr_cache(self):
def align_succ_cache(self):
"""Check and align the succ neighbor cache with node id cache"""
if self.enable_iter_cache and self.succ_align is False:
f = self.futures["succ"]
start_gid, self.succ_cache = f.result()
start_gid, self.succ_cache = self.futures["succ"].result()
if start_gid == self.iter_pre_gid:
if self.iter_gid != self.iter_pre_gid:
self._async_fetch_succ_cache(self.iter_gid)
Expand All @@ -143,9 +141,8 @@ def align_succ_cache(self):
def align_succ_attr_cache(self):
"""Check and align the succ neighbor attr cache with node id cache"""
if self.enable_iter_cache and self.succ_attr_align is False:
f = self.futures["succ_attr"]
if f is not None:
start_gid, self.succ_attr_cache = f.result()
if self.futures["succ_attr"] is not None:
start_gid, self.succ_attr_cache = self.futures["succ_attr"].result()
if start_gid == self.iter_pre_gid:
if self.iter_gid != self.iter_pre_gid:
self._async_fetch_succ_attr_cache(self.iter_gid)
Expand All @@ -160,8 +157,7 @@ def align_pred_cache(self):
if self.enable_iter_cache and self.pred_align is False:
if self.futures["pred"] is None:
self._async_fetch_pred_cache(self.iter_pre_gid)
f = self.futures["pred"]
start_gid, self.pred_cache = f.result()
start_gid, self.pred_cache = self.futures["pred"].result()
if start_gid == self.iter_pre_gid:
if self.iter_gid != self.iter_pre_gid:
self._async_fetch_pred_cache(self.iter_gid)
Expand All @@ -177,8 +173,7 @@ def align_pred_attr_cache(self):
if self.enable_iter_cache and self.pred_attr_align is False:
if self.futures["pred_attr"] is None:
self._async_fetch_pred_attr_cache(self.iter_pre_gid)
f = self.futures["pred_attr"]
start_gid, self.pred_attr_cache = f.result()
start_gid, self.pred_attr_cache = self.futures["pred_attr"].result()
if start_gid == self.iter_pre_gid:
if self.iter_gid != self.iter_pre_gid:
self._async_fetch_pred_attr_cache(self.iter_gid)
Expand Down Expand Up @@ -259,6 +254,9 @@ def shutdown(self):
pass
future = None

def shutdown_executor(self):
self.executor.shutdown(wait=True)

def clear(self):
"""Clear batch cache and lru cache, reset the status and warmup again"""
if self.enable_iter_cache:
Expand Down
10 changes: 8 additions & 2 deletions python/graphscope/nx/classes/digraph.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@

from graphscope.client.session import get_default_session
from graphscope.client.session import get_session_by_id
from graphscope.framework.dag_utils import copy_graph
from graphscope.framework import dag_utils
from graphscope.framework.errors import check_argument
from graphscope.framework.graph_schema import GraphSchema
from graphscope.nx import NetworkXError
Expand Down Expand Up @@ -295,6 +295,12 @@ def __init__(self, incoming_graph_data=None, default_label=None, **attr):
self._saved_signature = self.signature
self._is_client_view = False

# statically create the unload op
if self.op is None:
self._unload_op = None
else:
self._unload_op = dag_utils.unload_graph(self)

@property
@clear_mutation_cache
@patch_docstring(RefDiGraph.adj)
Expand Down Expand Up @@ -494,7 +500,7 @@ def reverse(self, copy=True):
g = self.__class__(create_empty_in_engine=False)
g.graph = self.graph
g.name = self.name
op = copy_graph(self, "reverse")
op = dag_utils.copy_graph(self, "reverse")
g._op = op
graph_def = op.eval(leaf=False)
g._key = graph_def.key
Expand Down
27 changes: 13 additions & 14 deletions python/graphscope/nx/classes/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
#

import copy
import threading

import orjson as json
from networkx import freeze
Expand Down Expand Up @@ -369,29 +368,29 @@ def __init__(self, incoming_graph_data=None, default_label=None, **attr):
self._saved_signature = self.signature
self._is_client_view = False

# statically create the unload op
if self.op is None:
self._unload_op = None
else:
self._unload_op = dag_utils.unload_graph(self)

def _is_gs_graph(self, incoming_graph_data):
return (
hasattr(incoming_graph_data, "graph_type")
and incoming_graph_data.graph_type == graph_def_pb2.ARROW_PROPERTY
)

def __del__(self):
if self._session.info["status"] != "active" or self._key is None:
if self._key is None or self._session.disconnected:
return

# use thread to avoid dead-lock
def _del(graph):
# cancel cache fetch future
if graph.cache.enable_iter_cache:
graph.cache.shutdown()
op = dag_utils.unload_graph(graph)
op.eval()
graph._key = None
if self.cache.enable_iter_cache:
self.cache.shutdown()
self.cache.shutdown_executor()

if not self._is_client_view:
t = threading.Thread(target=_del, args=(self,))
t.daemon = True
t.start()
if not self._is_client_view and self._unload_op is not None:
self._unload_op.eval()
self._key = None

@property
def op(self):
Expand Down
6 changes: 3 additions & 3 deletions python/graphscope/nx/tests/classes/test_graphviews.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@

@pytest.mark.usefixtures("graphscope_session")
class TestReverseView(test_gvs.TestReverseView):
def setup(self):
def setup_method(self):
self.G = nx.path_graph(9, create_using=nx.DiGraph())
self.rv = self.G.reverse(copy=False)
# self.rv = nx.reverse_view(self.G)
Expand Down Expand Up @@ -67,7 +67,7 @@ def test_pickle(self):

@pytest.mark.usefixtures("graphscope_session")
class TestToDirected(test_gvs.TestToDirected):
def setup(self):
def setup_method(self):
self.G = nx.path_graph(9)
self.dv = nx.to_directed(self.G)

Expand All @@ -87,7 +87,7 @@ def test_iter(self):

@pytest.mark.usefixtures("graphscope_session")
class TestToUndirected(test_gvs.TestToUndirected):
def setup(self):
def setup_method(self):
self.DG = nx.path_graph(9, create_using=nx.DiGraph())
self.uv = nx.to_undirected(self.DG)

Expand Down
2 changes: 1 addition & 1 deletion python/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ gremlinpython==3.6.3rc1
grpcio>=1.49
grpcio-tools>=1.49
kubernetes>=24.2.0
msgpack
msgpack>=1.0.5
mypy-protobuf>=3.4.0
nest_asyncio
networkx==2.8.0;python_version>="3.8"
Expand Down
1 change: 1 addition & 0 deletions python/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,7 @@ def parsed_package_data():
return {
"graphscope": [
"VERSION",
"proto/*.pyi",
],
}

Expand Down