Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

READ via DESCRIBE query #541

Merged
merged 9 commits into from
Jul 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
.PHONY: run-explorer run-tests run-linters build-ui build-python build-docker run-docker compose-up
version="0.85.12"
version="0.86.0"
run-explorer:
@echo "Running explorer API server..."
# open "http://localhost:8000/static/index.html" || true
Expand Down
2 changes: 1 addition & 1 deletion cognite/neat/_version.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "0.85.12"
__version__ = "0.86.0"
17 changes: 2 additions & 15 deletions cognite/neat/graph/loaders/_rdf2dms.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import json
from collections import defaultdict
from collections.abc import Iterable, Sequence
from pathlib import Path
from typing import Any, get_args
Expand Down Expand Up @@ -106,8 +105,8 @@ def _load(self, stop_on_exception: bool = False) -> Iterable[dm.InstanceApply |
yield from issues
tracker.issue(issues)
class_name = self.class_by_view_id.get(view.as_id(), view.external_id)
triples = self.graph_store.read(class_name)
for identifier, properties in _triples2dictionary(triples).items():

for identifier, properties in self.graph_store.read(class_name):
try:
yield self._create_node(identifier, properties, pydantic_cls, view_id)
except ValueError as e:
Expand Down Expand Up @@ -318,15 +317,3 @@ def _upload_to_cdf(

def _get_field_value_types(cls, info):
return [type_.__name__ for type_ in get_args(cls.model_fields[info.field_name].annotation)]


def _triples2dictionary(
triples: Iterable[tuple[str, str, str]],
) -> dict[str, dict[str, list[str]]]:
"""Converts list of triples to dictionary"""
values_by_property_by_identifier: dict[str, dict[str, list[str]]] = defaultdict(lambda: defaultdict(list))
for id_, property_, value in triples:
# avoid issue with strings "None", "nan", "null" being treated as values
if value.lower() not in ["", "None", "nan", "null"]:
values_by_property_by_identifier[id_][property_].append(value)
return values_by_property_by_identifier
nikokaoja marked this conversation as resolved.
Show resolved Hide resolved
63 changes: 58 additions & 5 deletions cognite/neat/graph/queries/_base.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import warnings
from collections import defaultdict
from typing import Literal, cast, overload

from rdflib import RDF, Graph, URIRef
Expand Down Expand Up @@ -93,23 +94,75 @@ def triples_of_type_instances(self, rdf_type: str) -> list[tuple[str, str, str]]
)
return []

def construct_instances_of_class(self, class_: str, properties_optional: bool = True) -> list[tuple[str, str, str]]:
def describe(
self,
instance_id: URIRef,
property_renaming_config: dict | None = None,
) -> tuple[str, dict[str, list[str]]]:
"""DESCRIBE instance for a given class from the graph store

Args:
instance_id: Instance id for which we want to generate query
property_rename_config: Dictionary to rename properties, default None

Returns:
Dictionary of instance properties
"""

property_values: dict[str, list[str]] = defaultdict(list)

for subject, predicate, object_ in cast(list[ResultRow], self.graph.query(f"DESCRIBE <{instance_id}>")):
# We cannot include the RDF.type in case there is a neat:type property
# or if the object is empty
if predicate != RDF.type and object_.lower() not in [
"",
"none",
"nan",
"null",
]:
# we are skipping deep validation with Pydantic to remove namespace here
# as it reduce time to process triples by 10-15x
identifier, property_, value = cast( # type: ignore[misc]
(str, str, str),
remove_namespace_from_uri(*(subject, predicate, object_), validation="prefix"),
) # type: ignore[misc, index]

if property_renaming_config:
predicate = property_renaming_config.get(property_, property_)

property_values[property_].append(value)
if property_values:
return (
identifier,
property_values,
)
else:
return () # type: ignore [return-value]

def construct_instances_of_class(
self,
class_: str,
properties_optional: bool = True,
instance_id: URIRef | None = None,
) -> list[tuple[str, str, str]]:
"""CONSTRUCT instances for a given class from the graph store

Args:
class_: Class entity for which we want to generate query
properties_optional: Whether to make all properties optional, default True
instance_ids: List of instance ids to filter on, default None (all)

Returns:
List of triples for instances of the given class
"""

if self.rules and (
query := build_construct_query(
ClassEntity(prefix=self.rules.metadata.prefix, suffix=class_),
self.graph,
self.rules,
properties_optional,
class_=ClassEntity(prefix=self.rules.metadata.prefix, suffix=class_),
graph=self.graph,
rules=self.rules,
properties_optional=properties_optional,
instance_id=instance_id,
)
):
result = self.graph.query(query)
Expand Down
28 changes: 15 additions & 13 deletions cognite/neat/graph/queries/_construct.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@
from ._shared import Triple, hop2property_path

_QUERY_TEMPLATE = """CONSTRUCT {{ {graph_template} }}
WHERE {{ {graph_pattern}
{filter}
WHERE {{ {bind_instance_id}
{graph_pattern}
}}"""


Expand All @@ -28,7 +28,7 @@ def build_construct_query(
graph: Graph,
rules: InformationRules,
properties_optional: bool = True,
class_instances: list[URIRef] | None = None,
instance_id: URIRef | None = None,
) -> str | None:
"""Builds a CONSTRUCT query for a given class and rules and optionally filters by class instances.

Expand All @@ -53,6 +53,7 @@ def build_construct_query(
This is the reason why there is an option to make all properties optional, so that
the query will return all instances that have at least one property defined.
"""

if (
transformations := InformationArchitectRulesAnalysis(rules)
.class_property_pairs(only_rdfpath=True, consider_inheritance=True)
Expand All @@ -63,22 +64,20 @@ def build_construct_query(
)

return _QUERY_TEMPLATE.format(
bind_instance_id=(f"BIND(<{instance_id}> AS ?instance)" if instance_id else ""),
graph_template="\n".join(triples2sparql_statement(templates)),
graph_pattern="\n".join(triples2sparql_statement(patterns)),
filter="" if not class_instances else add_filter(class_instances),
)

else:
return None


def add_filter(class_instances: list[URIRef]):
class_instances_formatted = [f"<{instance}>" for instance in class_instances]
return f"FILTER (?instance IN ({', '.join(class_instances_formatted)}))"


def to_construct_triples(
graph: Graph, transformations: list[InformationProperty], prefixes: dict, properties_optional: bool = True
graph: Graph,
transformations: list[InformationProperty],
prefixes: dict,
properties_optional: bool = True,
) -> tuple[list[Triple], list[Triple]]:
"""Converts transformations of a class to CONSTRUCT triples which are used to generate CONSTRUCT query

Expand Down Expand Up @@ -124,7 +123,10 @@ def to_construct_triples(
# use case AllReferences: binding instance to certain rdf property
if isinstance(traversal, AllReferences):
graph_pattern_triple = Triple(
subject="BIND(?instance", predicate="AS", object=f"{graph_template_triple.object})", optional=False
subject="BIND(?instance",
predicate="AS",
object=f"{graph_template_triple.object})",
optional=False,
)

# use case SingleProperty: simple property traversal
Expand All @@ -133,7 +135,7 @@ def to_construct_triples(
subject=graph_template_triple.subject,
predicate=traversal.property.id,
object=graph_template_triple.object,
optional=True if properties_optional else not transformation.is_mandatory,
optional=(True if properties_optional else not transformation.is_mandatory),
)

# use case Hop: property traversal with multiple hops turned into property path
Expand All @@ -143,7 +145,7 @@ def to_construct_triples(
subject="?instance",
predicate=hop2property_path(graph, traversal, prefixes),
object=graph_template_triple.object,
optional=True if properties_optional else not transformation.is_mandatory,
optional=(True if properties_optional else not transformation.is_mandatory),
)

# other type of rdfpaths are skipped
Expand Down
28 changes: 18 additions & 10 deletions cognite/neat/graph/stores/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from cognite.neat.graph.models import Triple
from cognite.neat.graph.queries import Queries
from cognite.neat.graph.transformers import Transformers
from cognite.neat.rules.analysis import InformationArchitectRulesAnalysis
from cognite.neat.rules.models import InformationRules
from cognite.neat.rules.models.entities import ClassEntity
from cognite.neat.utils.auxiliary import local_import
Expand Down Expand Up @@ -165,25 +166,32 @@ def write(self, extractor: TripleExtractors) -> None:
)
)

def read(self, class_: str) -> list[tuple[str, str, str]]:
def read(self, class_: str) -> Iterable[tuple[str, dict[str, list[str]]]]:
"""Read instances for given view from the graph store."""
# PLACEHOLDER: Implement reading instances for a given view
# not yet developed

if not self.rules:
warnings.warn(
"No rules found for the graph store, returning empty list.",
stacklevel=2,
)
return []
warnings.warn("Rules not found in graph store!", stacklevel=2)
return None

class_entity = ClassEntity(prefix=self.rules.metadata.prefix, suffix=class_)

if class_entity not in [definition.class_ for definition in self.rules.classes.data]:
warnings.warn("Desired type not found in graph!", stacklevel=2)
return []
return None

if InformationArchitectRulesAnalysis(self.rules).has_hop_transformations():
warnings.warn(
"Rules contain Hop rdfpath, run ReduceHopTraversal transformer first!",
stacklevel=2,
)
return None

instance_ids = self.queries.list_instances_ids_of_class(self.rules.metadata.namespace[class_])

property_renaming_config = InformationArchitectRulesAnalysis(self.rules).define_property_renaming_config()

return self.queries.construct_instances_of_class(class_)
for instance_id in instance_ids:
yield self.queries.describe(instance_id, property_renaming_config)

def _parse_file(
self,
Expand Down
7 changes: 7 additions & 0 deletions cognite/neat/graph/transformers/_rdfpath.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
from ._base import BaseTransformer


class ReduceHopTraversal(BaseTransformer):
"""ReduceHopTraversal is a transformer that reduces the number of hops to direct connection."""

...
11 changes: 9 additions & 2 deletions cognite/neat/rules/analysis/_information_rules.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from pydantic import ValidationError

from cognite.neat.rules.models import SchemaCompleteness
from cognite.neat.rules.models._rdfpath import RDFPath
from cognite.neat.rules.models._rdfpath import Hop, RDFPath
from cognite.neat.rules.models.asset import AssetClass, AssetProperty, AssetRules
from cognite.neat.rules.models.entities import (
AssetEntity,
Expand Down Expand Up @@ -401,7 +401,14 @@ def subset_rules(self, desired_classes: set[ClassEntity]) -> T_Rules:
class InformationArchitectRulesAnalysis(_SharedAnalysis[InformationRules, InformationProperty, InformationClass]):
"""Assumes analysis over only the complete schema"""

...
def has_hop_transformations(self):
return any(
prop_.transformation and isinstance(prop_.transformation.traversal, Hop) for prop_ in self.rules.properties
)

def define_property_renaming_config(self) -> dict[str, str]:
# placeholder comes in new PR
return {}


class AssetArchitectRulesAnalysis(_SharedAnalysis[AssetRules, AssetProperty, AssetClass]):
Expand Down
Loading
Loading