Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

wip: mirror_crud_to_nxcg #65

Draft
wants to merge 9 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion nx_arangodb/classes/dict/adj.py
Original file line number Diff line number Diff line change
Expand Up @@ -1815,7 +1815,7 @@ def propagate_edge_directed_symmetric(
set_adj_inner_dict_mirror(src_node_id)
set_adj_inner_dict_mirror(dst_node_id)

edge_attr_or_key_dict = set_edge_func( # type: ignore[operator]
edge_attr_or_key_dict = set_edge_func(
src_node_id, dst_node_id, edge_or_edges
)

Expand Down
19 changes: 17 additions & 2 deletions nx_arangodb/classes/digraph.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

from .dict.adj import AdjListOuterDict
from .enum import TraversalDirection
from .function import get_node_id
from .function import get_node_id, mirror_to_nxcg

networkx_api = nxadb.utils.decorators.networkx_class(nx.DiGraph) # type: ignore

Expand Down Expand Up @@ -131,6 +131,15 @@ class DiGraph(Graph, nx.DiGraph):
this operation is irreversible and will result in the loss of all data in
the graph. NOTE: If set to True, Collection Indexes will also be lost.

mirror_crud_to_nxcg : bool (optional, default: False)
Whether to mirror any CRUD operations performed on the NetworkX-ArangoDB Graph
to the cached NetworkX-cuGraph Graph (if available). This allows you to maintain
an up-to-date in-memory NetworkX-cuGraph graph while performing CRUD operations
on the NetworkX-ArangoDB Graph. NOTE: The first time you perform a CRUD
operation on the NetworkX-ArangoDB Graph with an existing NetworkX-cuGraph cache
will require downtime to copy the NetworkX-cuGraph Graph from GPU memory to CPU
memory. Subsequent CRUD operations will not require this downtime.

args: positional arguments for nx.Graph
Additional arguments passed to nx.Graph.

Expand Down Expand Up @@ -161,6 +170,7 @@ def __init__(
symmetrize_edges: bool = False,
use_arango_views: bool = False,
overwrite_graph: bool = False,
mirror_crud_to_nxcg: bool = False,
*args: Any,
**kwargs: Any,
):
Expand All @@ -179,16 +189,18 @@ def __init__(
symmetrize_edges,
use_arango_views,
overwrite_graph,
mirror_crud_to_nxcg,
*args,
**kwargs,
)

if self.graph_exists_in_db:
self.clear_edges = self.clear_edges_override
self.reverse = self.reverse_override

self.add_node = self.add_node_override
self.add_nodes_from = self.add_nodes_from_override
self.remove_node = self.remove_node_override
self.reverse = self.reverse_override

assert isinstance(self._succ, AdjListOuterDict)
assert isinstance(self._pred, AdjListOuterDict)
Expand Down Expand Up @@ -234,6 +246,7 @@ def clear_edges_override(self):

super().clear_edges()

@mirror_to_nxcg
def add_node_override(self, node_for_adding, **attr):
if node_for_adding is None:
raise ValueError("None cannot be a node")
Expand Down Expand Up @@ -269,6 +282,7 @@ def add_node_override(self, node_for_adding, **attr):

nx._clear_cache(self)

@mirror_to_nxcg
def add_nodes_from_override(self, nodes_for_adding, **attr):
for n in nodes_for_adding:
try:
Expand Down Expand Up @@ -312,6 +326,7 @@ def add_nodes_from_override(self, nodes_for_adding, **attr):

nx._clear_cache(self)

@mirror_to_nxcg
def remove_node_override(self, n):
if isinstance(n, (str, int)):
n = get_node_id(str(n), self.default_node_type)
Expand Down
17 changes: 17 additions & 0 deletions nx_arangodb/classes/function.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

from __future__ import annotations

from functools import wraps
from typing import Any, Callable, Generator, Tuple

import networkx as nx
Expand Down Expand Up @@ -932,3 +933,19 @@ def upsert_collection_edges(
)

return results


def mirror_to_nxcg(func):
@wraps(func)
def wrapper(self, *args, **kwargs):
result = func(self, *args, **kwargs)
if self.mirror_crud_to_nxcg and self.nxcg_graph is not None:
if "_override" not in func.__name__:
m = f"Function '{func.__name__}' is not an override function."
raise ValueError(m)

func_name = func.__name__.replace("_override", "")
getattr(self.nxcg_graph, func_name)(*args, **kwargs)
return result

return wrapper
209 changes: 131 additions & 78 deletions nx_arangodb/classes/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
node_attr_dict_factory,
node_dict_factory,
)
from .function import get_node_id
from .function import get_node_id, mirror_to_nxcg
from .reportviews import ArangoEdgeView, ArangoNodeView

networkx_api = nxadb.utils.decorators.networkx_class(nx.Graph) # type: ignore
Expand Down Expand Up @@ -165,6 +165,15 @@ class Graph(nx.Graph):
this operation is irreversible and will result in the loss of all data in
the graph. NOTE: If set to True, Collection Indexes will also be lost.

mirror_crud_to_nxcg : bool (optional, default: False)
Whether to mirror any CRUD operations performed on the NetworkX-ArangoDB Graph
to the cached NetworkX-cuGraph Graph (if available). This allows you to maintain
an up-to-date in-memory NetworkX-cuGraph graph while performing CRUD operations
on the NetworkX-ArangoDB Graph. NOTE: The first time you perform a CRUD
operation on the NetworkX-ArangoDB Graph with an existing NetworkX-cuGraph cache
will require downtime to copy the NetworkX-cuGraph Graph from GPU memory to CPU
memory. Subsequent CRUD operations will not require this downtime.

args: positional arguments for nx.Graph
Additional arguments passed to nx.Graph.

Expand Down Expand Up @@ -195,12 +204,14 @@ def __init__(
symmetrize_edges: bool = False,
use_arango_views: bool = False,
overwrite_graph: bool = False,
mirror_crud_to_nxcg: bool = False,
*args: Any,
**kwargs: Any,
):
self.__db = None
self.__use_arango_views = use_arango_views
self.__graph_exists_in_db = False
self.__mirror_crud_to_nxcg = mirror_crud_to_nxcg

self.__set_db(db)
if all([self.__db, name]):
Expand Down Expand Up @@ -261,11 +272,19 @@ def __init__(
self.subgraph = self.subgraph_override
self.clear = self.clear_override
self.clear_edges = self.clear_edges_override
self.add_node = self.add_node_override
self.add_nodes_from = self.add_nodes_from_override
self.number_of_edges = self.number_of_edges_override
self.nbunch_iter = self.nbunch_iter_override

self.add_node = self.add_node_override
self.add_nodes_from = self.add_nodes_from_override
self.remove_node = self.remove_node_override
self.remove_nodes_from = self.remove_nodes_from_override
self.add_edge = self.add_edge_override
self.add_edges_from = self.add_edges_from_override
self.remove_edge = self.remove_edge_override
self.remove_edges_from = self.remove_edges_from_override
self.update = self.update_override

# If incoming_graph_data wasn't loaded by the NetworkX Adapter,
# then we can rely on the CRUD operations of the modified dictionaries
# to load the data into the graph. However, if the graph is directed
Expand Down Expand Up @@ -541,6 +560,10 @@ def is_smart(self) -> bool:
def smart_field(self) -> str | None:
return self.__smart_field

@property
def mirror_crud_to_nxcg(self) -> bool:
return self.__mirror_crud_to_nxcg

###########
# Setters #
###########
Expand Down Expand Up @@ -691,81 +714,6 @@ def clear_edges_override(self):
nbr_dict.clear()
nx._clear_cache(self)

def add_node_override(self, node_for_adding, **attr):
if node_for_adding is None:
raise ValueError("None cannot be a node")

if node_for_adding not in self._node:
self._adj[node_for_adding] = self.adjlist_inner_dict_factory()

######################
# NOTE: monkey patch #
######################

# Old:
# attr_dict = self._node[node_for_adding] = self.node_attr_dict_factory()
# attr_dict.update(attr)

# New:
node_attr_dict = self.node_attr_dict_factory()
node_attr_dict.data = attr
self._node[node_for_adding] = node_attr_dict

# Reason:
# We can optimize the process of adding a node by creating avoiding
# the creation of a new dictionary and updating it with the attributes.
# Instead, we can create a new node_attr_dict object and set the attributes
# directly. This only makes 1 network call to the database instead of 2.

###########################

else:
self._node[node_for_adding].update(attr)

nx._clear_cache(self)

def add_nodes_from_override(self, nodes_for_adding, **attr):
for n in nodes_for_adding:
try:
newnode = n not in self._node
newdict = attr
except TypeError:
n, ndict = n
newnode = n not in self._node
newdict = attr.copy()
newdict.update(ndict)
if newnode:
if n is None:
raise ValueError("None cannot be a node")
self._adj[n] = self.adjlist_inner_dict_factory()

######################
# NOTE: monkey patch #
######################

# Old:
# self._node[n] = self.node_attr_dict_factory()
#
# self._node[n].update(newdict)

# New:
node_attr_dict = self.node_attr_dict_factory()
node_attr_dict.data = newdict
self._node[n] = node_attr_dict

else:
self._node[n].update(newdict)

# Reason:
# We can optimize the process of adding a node by creating avoiding
# the creation of a new dictionary and updating it with the attributes.
# Instead, we create a new node_attr_dict object and set the attributes
# directly. This only makes 1 network call to the database instead of 2.

###########################

nx._clear_cache(self)

def number_of_edges_override(self, u=None, v=None):
if u is not None:
return super().number_of_edges(u, v)
Expand Down Expand Up @@ -847,3 +795,108 @@ def bunch_iter(nlist, adj):

bunch = bunch_iter(nbunch, self._adj)
return bunch

@mirror_to_nxcg
def add_node_override(self, node_for_adding, **attr):
if node_for_adding is None:
raise ValueError("None cannot be a node")

if node_for_adding not in self._node:
self._adj[node_for_adding] = self.adjlist_inner_dict_factory()

######################
# NOTE: monkey patch #
######################

# Old:
# attr_dict = self._node[node_for_adding] = self.node_attr_dict_factory()
# attr_dict.update(attr)

# New:
node_attr_dict = self.node_attr_dict_factory()
node_attr_dict.data = attr
self._node[node_for_adding] = node_attr_dict

# Reason:
# We can optimize the process of adding a node by creating avoiding
# the creation of a new dictionary and updating it with the attributes.
# Instead, we can create a new node_attr_dict object and set the attributes
# directly. This only makes 1 network call to the database instead of 2.

###########################

else:
self._node[node_for_adding].update(attr)

nx._clear_cache(self)

@mirror_to_nxcg
def add_nodes_from_override(self, nodes_for_adding, **attr):
for n in nodes_for_adding:
try:
newnode = n not in self._node
newdict = attr
except TypeError:
n, ndict = n
newnode = n not in self._node
newdict = attr.copy()
newdict.update(ndict)
if newnode:
if n is None:
raise ValueError("None cannot be a node")
self._adj[n] = self.adjlist_inner_dict_factory()

######################
# NOTE: monkey patch #
######################

# Old:
# self._node[n] = self.node_attr_dict_factory()
#
# self._node[n].update(newdict)

# New:
node_attr_dict = self.node_attr_dict_factory()
node_attr_dict.data = newdict
self._node[n] = node_attr_dict

else:
self._node[n].update(newdict)

# Reason:
# We can optimize the process of adding a node by creating avoiding
# the creation of a new dictionary and updating it with the attributes.
# Instead, we create a new node_attr_dict object and set the attributes
# directly. This only makes 1 network call to the database instead of 2.

###########################

nx._clear_cache(self)

@mirror_to_nxcg
def remove_node_override(self, n):
super().remove_node(n)

@mirror_to_nxcg
def remove_nodes_from_override(self, nodes):
super().remove_nodes_from(nodes)

@mirror_to_nxcg
def add_edge_override(self, u, v, **attr):
super().add_edge(u, v, **attr)

@mirror_to_nxcg
def add_edges_from_override(self, ebunch_to_add, **attr):
super().add_edges_from(ebunch_to_add, **attr)

@mirror_to_nxcg
def remove_edge_override(self, u, v):
super().remove_edge(u, v)

@mirror_to_nxcg
def remove_edges_from_override(self, ebunch):
super().remove_edges_from(ebunch)

@mirror_to_nxcg
def update_override(self, *args, **kwargs):
super().update(*args, **kwargs)
Loading
Loading