Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add local db support with neo4j #47

Draft
wants to merge 5 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions calamus/backends/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
# -*- coding: utf-8 -*-
#
# Copyright 2017-2020- Swiss Data Science Center (SDSC)
# A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and
# Eidgenössische Technische Hochschule Zürich (ETHZ).
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Calamus db backends."""


class CalamusDbBackend:
def __init__(self, **config):
"""Initialize provided the backend config."""
return NotImplementedError

def commit(self, entity, schema):
"""Commit an object to the db using the schema."""
raise NotImplementedError

def fetch_by_id(self, identifier, schema):
"""Fetch an entity by identifier from the db."""
raise NotImplementedError
110 changes: 110 additions & 0 deletions calamus/backends/neo4j.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
# -*- coding: utf-8 -*-
#
# Copyright 2020- Swiss Data Science Center (SDSC)
# A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and
# Eidgenössische Technische Hochschule Zürich (ETHZ).
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Neo4J connection utils."""
import json
import time

try:
import requests
except ImportError:
pass

from urllib.parse import quote

try:
import py2neo
from py2neo import Graph
from py2neo.database.work import ClientError
except ImportError:
pass

from . import CalamusDbBackend


class CalamusNeo4JBackend(CalamusDbBackend):
"""Neo4J backend for Calamus."""

def __init__(self, auth={"user": "neo4j", "pass": "test"}, host="localhost", bolt_port=7687, http_port=7474):
self.auth = auth
self.bolt_url = f"bolt://{auth['user']}:{auth['pass']}@{host}:{bolt_port}"
self.http_url = f"http://{auth['user']}:{auth['pass']}@{host}:{http_port}"

self.graph = None

def initialize(self):
"""Initialize the Neo4J graph."""
# TODO: provide options for adding extra configs
self.graph = Graph(self.bolt_url)
try:
# initialize the config
self.graph.call.n10s.graphconfig.init({"handleVocabUris": "KEEP"})

# set URI constraint
res = self.graph.run(
"""
CREATE CONSTRAINT n10s_unique_uri ON (r:Resource)
ASSERT r.uri IS UNIQUE;
"""
)
# TODO: handle the client error in a more specific way
except ClientError:
pass

# TODO: provide options for initializing prefixes
# for prefix, prefix_uri in prefixes:
# g.call.n10s.nsprefixes.add(f"{prefix}", f"{prefix_uri}")

return self.graph

def commit(self, entity, schema):
"""Commit an object to Neo4J using the schema."""
# TODO: use automatic schema retrieval
# schema = _model_registry[entity.__class__]
if not self.graph:
raise RuntimeError("The graph must first be initialized.")

res = self.graph.run(
"call n10s.rdf.import.inline('{jsonld}', \"JSON-LD\")".format(jsonld=schema().dumps(entity))
)
return res

def fetch_by_id(self, identifier):
"""Fetch an entity by id from Neo4J using the provided schema."""
if not self.graph:
raise RuntimeError("The graph must first be initialized.")

cypher = f"""
MATCH path=((n:Resource {{uri: "{identifier}"}}) -[*0..1]-> ()) RETURN path
"""

payload = {"cypher": cypher, "format": "JSON-LD"}

print(cypher)
timein = time.time()
res = requests.post(
f"{self.http_url}/rdf/neo4j/cypher", data=json.dumps(payload), auth=tuple(self.auth.values())
)
print(f"time: {time.time()-timein}")
data = res.json()

if data is not None:
data = data.get("@graph", data)
return data

def query(self, data, schema):
"""Construct a query based on the data and the schema."""
14 changes: 12 additions & 2 deletions calamus/fields.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@

@total_ordering
class IRIReference(object):
""" Represent an IRI in a namespace.
"""Represent an IRI in a namespace.

Args:
namespace (Namespace): The ``Namespace`` this IRI is part of.
Expand Down Expand Up @@ -342,6 +342,7 @@ def schema(self):
dump_only=self._nested_normalized_option("dump_only"),
lazy=self.root.lazy,
flattened=self.root.flattened,
session=self.root.session,
_visited=self.root._visited,
_top_level=False,
)
Expand Down Expand Up @@ -449,7 +450,16 @@ def _load(self, value, data, partial=None, many=False):

def _dereference_single_id(self, value, attr, **kwargs):
"""Dereference a single id."""
data = kwargs["_all_objects"].get(value, None)
session = kwargs.get("session")

data = None

if kwargs["_all_objects"] is not None:
data = kwargs["_all_objects"].get(value, None)

if not data and session:
data = session.fetch_by_id(value)

if not data:
raise ValueError("Couldn't dereference id {id}".format(id=value))

Expand Down
5 changes: 4 additions & 1 deletion calamus/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ def __init__(
unknown=None,
flattened=False,
lazy=False,
session=None,
_all_objects=None,
_visited=None,
_top_level=True,
Expand All @@ -139,7 +140,8 @@ def __init__(
unknown=unknown,
)

self.flattened = flattened
self.session = session
self.flattened = flattened | (session is not None)
self.lazy = lazy
self._top_level = _top_level
self._all_objects = _all_objects
Expand Down Expand Up @@ -326,6 +328,7 @@ def _deserialize(
d_kwargs["_all_objects"] = self._all_objects
d_kwargs["flattened"] = self.flattened
d_kwargs["lazy"] = self.lazy
d_kwargs["session"] = self.session
getter = lambda val: field_obj.deserialize(val, field_name, data, **d_kwargs)
value = self._call_and_store(
getter_func=getter, data=raw_value, field_name=field_name, error_store=error_store, index=index,
Expand Down
Loading