diff --git a/analytical_engine/core/fragment/dynamic_fragment.h b/analytical_engine/core/fragment/dynamic_fragment.h index 5cea4fe47686..1fc009665707 100644 --- a/analytical_engine/core/fragment/dynamic_fragment.h +++ b/analytical_engine/core/fragment/dynamic_fragment.h @@ -1529,7 +1529,7 @@ class DynamicFragmentMutator { v_fid = partitioner.GetPartitionId(oid); if (modify_type == rpc::NX_ADD_NODES) { vm_ptr_->AddVertex(std::move(oid), gid); - if (!v_data.Empty()) { + if (v_data.IsObject() && !v_data.GetObject().ObjectEmpty()) { for (const auto& prop : v_data.GetObject()) { if (!fragment_->schema_["vertex"].HasMember(prop.name)) { dynamic::Value key(prop.name); @@ -1606,7 +1606,7 @@ class DynamicFragmentMutator { std::move(empty_data)); } } - if (!e_data.Empty()) { + if (e_data.IsObject() && !e_data.GetObject().ObjectEmpty()) { for (const auto& prop : e_data.GetObject()) { if (!fragment_->schema_["edge"].HasMember(prop.name)) { dynamic::Value key(prop.name); diff --git a/analytical_engine/core/fragment/fragment_reporter.h b/analytical_engine/core/fragment/fragment_reporter.h index ad3f78156f1c..d055d6d0cf2d 100644 --- a/analytical_engine/core/fragment/fragment_reporter.h +++ b/analytical_engine/core/fragment/fragment_reporter.h @@ -105,7 +105,6 @@ class DynamicFragmentReporter : public grape::Communicator { } break; } - case rpc::HAS_EDGE: { BOOST_LEAF_AUTO(edge_in_json, params.Get(rpc::EDGE)); dynamic::Value edge; @@ -761,7 +760,9 @@ class ArrowFragmentReporter< } } // archive the start gid and nodes attribute array. - arc << gid << nodes_attr; + msgpack::sbuffer sbuf; + msgpack::pack(&sbuf, nodes_attr); + arc << gid << sbuf; } } @@ -859,7 +860,9 @@ class ArrowFragmentReporter< } } // archive the start gid and edges attributes array. - arc << gid << adj_list; + msgpack::sbuffer sbuf; + msgpack::pack(&sbuf, adj_list); + arc << gid << sbuf; } } diff --git a/python/graphscope/client/session.py b/python/graphscope/client/session.py index 57d70e4d6cc4..83eb68dd7d3c 100755 --- a/python/graphscope/client/session.py +++ b/python/graphscope/client/session.py @@ -787,6 +787,10 @@ def info(self): def closed(self): return self._closed + @property + def disconnected(self): + return self._grpc_client is None or self._disconnected + def eager(self): return self._config_params["mode"] == "eager" diff --git a/python/graphscope/framework/app.py b/python/graphscope/framework/app.py index 4ff78a2b34d8..ea62fd13c9ad 100644 --- a/python/graphscope/framework/app.py +++ b/python/graphscope/framework/app.py @@ -352,6 +352,10 @@ class name. # add op to dag self._session.dag.add_op(self._op) + # statically create the unload op to prevent a possible segmentation fault + # inside the protobuf library. + self._unload_op = unload_app(self) + def __repr__(self): s = f"graphscope.App " @@ -420,8 +424,7 @@ def _unload(self): Returns: :class:`graphscope.framework.app.UnloadedApp`: Evaluated in eager mode. """ - op = unload_app(self) - return UnloadedApp(self._session, op) + return UnloadedApp(self._session, self._unload_op) class App(object): diff --git a/python/graphscope/framework/context.py b/python/graphscope/framework/context.py index 4e09e98bb356..4b6f94b66383 100644 --- a/python/graphscope/framework/context.py +++ b/python/graphscope/framework/context.py @@ -121,6 +121,9 @@ def __init__(self, bound_app, graph, *args, **kwargs): self._op = run_app(self._bound_app, *args, **kwargs) self._session.dag.add_op(self._op) + # statically create the unload op + self._unload_op = dag_utils.unload_context(self) + def _check_selector(self, selector): raise NotImplementedError() @@ -279,8 +282,7 @@ def __del__(self): pass def _unload(self): - op = dag_utils.unload_context(self) - return UnloadedContext(self._session, op) + return UnloadedContext(self._session, self._unload_op) class TensorContextDAGNode(BaseContextDAGNode): diff --git a/python/graphscope/framework/graph.py b/python/graphscope/framework/graph.py index bf3236cd3278..3b3243798608 100644 --- a/python/graphscope/framework/graph.py +++ b/python/graphscope/framework/graph.py @@ -292,6 +292,9 @@ def __init__( self._resolve_op(incoming_data) self._session.dag.add_op(self._op) + # statically create the unload op + self._unload_op = dag_utils.unload_graph(self) + @property def v_labels(self): return self._v_labels @@ -714,8 +717,7 @@ def _unload(self): Returns: :class:`graphscope.framework.graph.UnloadedGraph`: Evaluated in eager mode. """ - op = dag_utils.unload_graph(self) - return UnloadedGraph(self._session, op) + return UnloadedGraph(self._session, self._unload_op) def project( self, diff --git a/python/graphscope/nx/classes/cache.py b/python/graphscope/nx/classes/cache.py index d6012ee4bc58..f961b0c0c2f1 100644 --- a/python/graphscope/nx/classes/cache.py +++ b/python/graphscope/nx/classes/cache.py @@ -113,9 +113,8 @@ def get_pred_attr(self, n): def align_node_attr_cache(self): """Check and align the node attr cache with node id cache""" if self.enable_iter_cache and self.node_attr_align is False: - f = self.futures["node_attr"] - if f is not None: - start_gid, self.node_attr_cache = f.result() + if self.futures["node_attr"] is not None: + start_gid, self.node_attr_cache = self.futures["node_attr"].result() if start_gid == self.iter_pre_gid: # align to current node_id_cache if self.iter_gid != self.iter_pre_gid: @@ -129,8 +128,7 @@ def align_node_attr_cache(self): def align_succ_cache(self): """Check and align the succ neighbor cache with node id cache""" if self.enable_iter_cache and self.succ_align is False: - f = self.futures["succ"] - start_gid, self.succ_cache = f.result() + start_gid, self.succ_cache = self.futures["succ"].result() if start_gid == self.iter_pre_gid: if self.iter_gid != self.iter_pre_gid: self._async_fetch_succ_cache(self.iter_gid) @@ -143,9 +141,8 @@ def align_succ_cache(self): def align_succ_attr_cache(self): """Check and align the succ neighbor attr cache with node id cache""" if self.enable_iter_cache and self.succ_attr_align is False: - f = self.futures["succ_attr"] - if f is not None: - start_gid, self.succ_attr_cache = f.result() + if self.futures["succ_attr"] is not None: + start_gid, self.succ_attr_cache = self.futures["succ_attr"].result() if start_gid == self.iter_pre_gid: if self.iter_gid != self.iter_pre_gid: self._async_fetch_succ_attr_cache(self.iter_gid) @@ -160,8 +157,7 @@ def align_pred_cache(self): if self.enable_iter_cache and self.pred_align is False: if self.futures["pred"] is None: self._async_fetch_pred_cache(self.iter_pre_gid) - f = self.futures["pred"] - start_gid, self.pred_cache = f.result() + start_gid, self.pred_cache = self.futures["pred"].result() if start_gid == self.iter_pre_gid: if self.iter_gid != self.iter_pre_gid: self._async_fetch_pred_cache(self.iter_gid) @@ -177,8 +173,7 @@ def align_pred_attr_cache(self): if self.enable_iter_cache and self.pred_attr_align is False: if self.futures["pred_attr"] is None: self._async_fetch_pred_attr_cache(self.iter_pre_gid) - f = self.futures["pred_attr"] - start_gid, self.pred_attr_cache = f.result() + start_gid, self.pred_attr_cache = self.futures["pred_attr"].result() if start_gid == self.iter_pre_gid: if self.iter_gid != self.iter_pre_gid: self._async_fetch_pred_attr_cache(self.iter_gid) @@ -259,6 +254,9 @@ def shutdown(self): pass future = None + def shutdown_executor(self): + self.executor.shutdown(wait=True) + def clear(self): """Clear batch cache and lru cache, reset the status and warmup again""" if self.enable_iter_cache: diff --git a/python/graphscope/nx/classes/digraph.py b/python/graphscope/nx/classes/digraph.py index 628712dfdb22..101330d65b87 100644 --- a/python/graphscope/nx/classes/digraph.py +++ b/python/graphscope/nx/classes/digraph.py @@ -29,7 +29,7 @@ from graphscope.client.session import get_default_session from graphscope.client.session import get_session_by_id -from graphscope.framework.dag_utils import copy_graph +from graphscope.framework import dag_utils from graphscope.framework.errors import check_argument from graphscope.framework.graph_schema import GraphSchema from graphscope.nx import NetworkXError @@ -295,6 +295,12 @@ def __init__(self, incoming_graph_data=None, default_label=None, **attr): self._saved_signature = self.signature self._is_client_view = False + # statically create the unload op + if self.op is None: + self._unload_op = None + else: + self._unload_op = dag_utils.unload_graph(self) + @property @clear_mutation_cache @patch_docstring(RefDiGraph.adj) @@ -494,7 +500,7 @@ def reverse(self, copy=True): g = self.__class__(create_empty_in_engine=False) g.graph = self.graph g.name = self.name - op = copy_graph(self, "reverse") + op = dag_utils.copy_graph(self, "reverse") g._op = op graph_def = op.eval(leaf=False) g._key = graph_def.key diff --git a/python/graphscope/nx/classes/graph.py b/python/graphscope/nx/classes/graph.py index bdfd7c4989a7..7b0feac84767 100644 --- a/python/graphscope/nx/classes/graph.py +++ b/python/graphscope/nx/classes/graph.py @@ -20,7 +20,6 @@ # import copy -import threading import orjson as json from networkx import freeze @@ -369,6 +368,12 @@ def __init__(self, incoming_graph_data=None, default_label=None, **attr): self._saved_signature = self.signature self._is_client_view = False + # statically create the unload op + if self.op is None: + self._unload_op = None + else: + self._unload_op = dag_utils.unload_graph(self) + def _is_gs_graph(self, incoming_graph_data): return ( hasattr(incoming_graph_data, "graph_type") @@ -376,22 +381,16 @@ def _is_gs_graph(self, incoming_graph_data): ) def __del__(self): - if self._session.info["status"] != "active" or self._key is None: + if self._key is None or self._session.disconnected: return - # use thread to avoid dead-lock - def _del(graph): - # cancel cache fetch future - if graph.cache.enable_iter_cache: - graph.cache.shutdown() - op = dag_utils.unload_graph(graph) - op.eval() - graph._key = None + if self.cache.enable_iter_cache: + self.cache.shutdown() + self.cache.shutdown_executor() - if not self._is_client_view: - t = threading.Thread(target=_del, args=(self,)) - t.daemon = True - t.start() + if not self._is_client_view and self._unload_op is not None: + self._unload_op.eval() + self._key = None @property def op(self): diff --git a/python/graphscope/nx/tests/classes/test_graphviews.py b/python/graphscope/nx/tests/classes/test_graphviews.py index 0aaffadda3c8..455154ab093a 100644 --- a/python/graphscope/nx/tests/classes/test_graphviews.py +++ b/python/graphscope/nx/tests/classes/test_graphviews.py @@ -29,7 +29,7 @@ @pytest.mark.usefixtures("graphscope_session") class TestReverseView(test_gvs.TestReverseView): - def setup(self): + def setup_method(self): self.G = nx.path_graph(9, create_using=nx.DiGraph()) self.rv = self.G.reverse(copy=False) # self.rv = nx.reverse_view(self.G) @@ -67,7 +67,7 @@ def test_pickle(self): @pytest.mark.usefixtures("graphscope_session") class TestToDirected(test_gvs.TestToDirected): - def setup(self): + def setup_method(self): self.G = nx.path_graph(9) self.dv = nx.to_directed(self.G) @@ -87,7 +87,7 @@ def test_iter(self): @pytest.mark.usefixtures("graphscope_session") class TestToUndirected(test_gvs.TestToUndirected): - def setup(self): + def setup_method(self): self.DG = nx.path_graph(9, create_using=nx.DiGraph()) self.uv = nx.to_undirected(self.DG) diff --git a/python/requirements.txt b/python/requirements.txt index c25860bc1950..04bbb677676b 100644 --- a/python/requirements.txt +++ b/python/requirements.txt @@ -3,7 +3,7 @@ gremlinpython==3.6.3rc1 grpcio>=1.49 grpcio-tools>=1.49 kubernetes>=24.2.0 -msgpack +msgpack>=1.0.5 mypy-protobuf>=3.4.0 nest_asyncio networkx==2.8.0;python_version>="3.8" diff --git a/python/setup.py b/python/setup.py index 7a06946a86e1..827beca0d973 100644 --- a/python/setup.py +++ b/python/setup.py @@ -193,6 +193,7 @@ def parsed_package_data(): return { "graphscope": [ "VERSION", + "proto/*.pyi", ], }