Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Trying rdflib-sqlalchemy fork #110

Merged
merged 16 commits into from
Feb 21, 2024
Merged
4 changes: 2 additions & 2 deletions .github/workflows/builds.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ jobs:
steps:
- uses: actions/checkout@v4
- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v4
uses: actions/setup-python@v5
with:
python-version: ${{ matrix.python-version }}
- name: Install Poetry
Expand All @@ -33,7 +33,7 @@ jobs:
java-version: '17'
- name: Load cached venv
id: cached-poetry-dependencies
uses: actions/cache@v3
uses: actions/cache@v4
with:
path: .venv
key: venv-${{ runner.os }}-${{ hashFiles('**/poetry.lock') }}
Expand Down
3 changes: 0 additions & 3 deletions .gitmodules

This file was deleted.

9 changes: 9 additions & 0 deletions brickschema/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,15 @@
level=logging.WARNING,
)

has_sqlalchemy = False
try:
import rdflib_sqlalchemy
has_sqlalchemy = True
except ImportError as e:
print(e)
logging.warning(
"sqlalchemy not installed. SQL-backed graph support will not be available. Try 'pip install brickschema[persistence]' to install it."
)

__version__ = "0.2.0"
__all__ = ["graph", "inference", "namespaces"]
54 changes: 41 additions & 13 deletions brickschema/persistent.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from contextlib import contextmanager
from rdflib import ConjunctiveGraph
from rdflib.graph import BatchAddGraph
from rdflib import plugin
from rdflib import plugin, URIRef
from rdflib.store import Store
from rdflib_sqlalchemy import registerplugins
from sqlalchemy import text, Row
Expand Down Expand Up @@ -55,7 +55,7 @@ def __init__(self, uri: str, *args, **kwargs):
class Changeset(Graph):
def __init__(self, graph_name):
super().__init__()
self.name = graph_name
self.name = URIRef(graph_name)
self.uid = uuid.uuid4()
self.additions = []
self.deletions = []
Expand Down Expand Up @@ -87,7 +87,7 @@ def __init__(self, uri: str, *args, **kwargs):
"""
To create an in-memory store, use uri="sqlite://"
"""
store = plugin.get("SQLAlchemy", Store)(identifier="my_store")
store = plugin.get("SQLAlchemy", Store)(identifier=URIRef("my_store"))
super().__init__(store, *args, **kwargs)
self.open(uri, create=True)
self._precommit_hooks = OrderedDict()
Expand Down Expand Up @@ -204,7 +204,11 @@ def conn(self):

@contextmanager
def new_changeset(self, graph_name, ts=None):
if not isinstance(graph_name, URIRef):
graph_name = URIRef(graph_name)
namespaces = []
buffered_adds = []
buffered_removes = []
with self.conn() as conn:
transaction_start = time.time()
cs = Changeset(graph_name)
Expand All @@ -220,30 +224,34 @@ def new_changeset(self, graph_name, ts=None):
text("INSERT INTO changesets VALUES (:uid, :ts, :graph, :deletion, :triple)").bindparams(
uid=str(cs.uid),
ts=ts,
graph=graph_name,
graph=str(graph_name),
deletion=True,
triple=pickle.dumps(triple),
)
)
graph = self.get_context(graph_name)
for triple in cs.deletions:
graph.remove(triple)
buffered_removes.append(triple)
#graph = self.get_context(graph_name)
#for triple in cs.deletions:
# graph.remove(triple)
if cs.additions:
for triple in cs.additions:
conn.execute(
text("INSERT INTO changesets VALUES (:uid, :ts, :graph, :deletion, :triple)").bindparams(
uid=str(cs.uid),
ts=ts,
graph=graph_name,
graph=str(graph_name),
deletion=False,
triple=pickle.dumps(triple),
)
)
with BatchAddGraph(
self.get_context(graph_name), batch_size=10000
) as graph:
for triple in cs.additions:
graph.add(triple)
for triple in cs.additions:
buffered_adds.append(triple)
# with BatchAddGraph(
# self.get_context(graph_name), batch_size=10000
# ) as graph:
# for triple in cs.additions:
# graph.add(triple)

# take care of precommit hooks
transaction_end = time.time()
Expand All @@ -259,12 +267,26 @@ def new_changeset(self, graph_name, ts=None):
logging.info(
f"Committing after {transaction_end - transaction_start} seconds"
)
# add the buffered changes to the graph
print([(type(c.identifier), c.identifier) for c in self.contexts()])
graph = self.get_context(graph_name)
for triple in buffered_removes:
print(f"Removing {triple}")
graph.remove(triple)
with BatchAddGraph(graph, batch_size=10000) as graph:
for triple in buffered_adds:
print(f"Adding {triple}")
graph.add(triple)
print(f"Self graph has {len(self)} triples")
# loop through all of the contexts and print length
# update namespaces
for pfx, ns in namespaces:
self.bind(pfx, ns)
for hook in self._postcommit_hooks.values():
hook(self)
self._latest_version = ts
for c in self.contexts():
print(f"{c.identifier} has {len(c)} triples")

def latest(self, graph):
return self.get_context(graph)
Expand All @@ -280,6 +302,7 @@ def graph_at(self, timestamp=None, graph=None):
for t in self.get_context(graph).triples((None, None, None)):
g.add(t)
else:
# TODO: this doesn't work for some reason
for t in self.triples((None, None, None)):
g.add(t)
with self.conn() as conn:
Expand All @@ -295,7 +318,9 @@ def _graph_at(self, alter_graph, conn, timestamp=None, graph=None):
if isinstance(timestamp, (dict, Row)):
timestamp = timestamp["timestamp"]

print(f"Getting graph at {timestamp}", type(timestamp))
print(f"Getting graph {graph} ({type(graph)}) at {timestamp}", type(timestamp))
# print # of rows in changesets
print(f"Changesets has {len(list(conn.execute(text('SELECT * FROM changesets'))))} rows")
if graph is not None:
rows = conn.execute(
text("SELECT * FROM changesets WHERE graph = :g AND timestamp > :ts ORDER BY timestamp DESC").bindparams(
Expand All @@ -309,9 +334,12 @@ def _graph_at(self, alter_graph, conn, timestamp=None, graph=None):
)
)
for row in rows.mappings():
print(f"Row: {row}")
triple = pickle.loads(row["triple"])
if row["is_insertion"]:
print(f"Adding {triple}")
alter_graph.add((triple[0], triple[1], triple[2]))
else:
print(f"Removing {triple}")
alter_graph.remove((triple[0], triple[1], triple[2]))
return alter_graph
Loading