Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Wrapping up asset loader #551

Merged
merged 14 commits into from
Jul 19, 2024
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
.PHONY: run-explorer run-tests run-linters build-ui build-python build-docker run-docker compose-up
version="0.87.2"
version="0.87.3"
run-explorer:
@echo "Running explorer API server..."
# open "http://localhost:8000/static/index.html" || true
Expand Down
2 changes: 1 addition & 1 deletion cognite/neat/_version.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "0.87.2"
__version__ = "0.87.3"
116 changes: 97 additions & 19 deletions cognite/neat/graph/loaders/_rdf2asset.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
from collections import defaultdict
import json
from collections.abc import Iterable, Sequence
from dataclasses import dataclass, fields
from pathlib import Path
from typing import cast

import yaml
from cognite.client import CogniteClient
from cognite.client.data_classes import AssetWrite
from cognite.client.data_classes.capabilities import Capability
from cognite.client.data_classes.capabilities import AssetsAcl, Capability
from cognite.client.exceptions import CogniteAPIError

from cognite.neat.graph._tracking.base import Tracker
from cognite.neat.graph._tracking.log import LogTracker
Expand Down Expand Up @@ -81,10 +84,20 @@ def __init__(
self.rules = rules
self.data_set_id = data_set_id
self.use_labels = use_labels
self.use_orphanage = use_orphanage

self.orphanage_external_id = (
f"{asset_external_id_prefix or ''}orphanage-{data_set_id}" if use_orphanage else None
self.orphanage = (
AssetWrite.load(
{
"dataSetId": self.data_set_id,
"externalId": (
f"{asset_external_id_prefix or ''}orphanage-{data_set_id}" if use_orphanage else None
),
"name": "Orphanage",
"description": "Orphanage for assets whose parents do not exist",
}
)
if use_orphanage
else None
)

self.asset_external_id_prefix = asset_external_id_prefix
Expand All @@ -111,6 +124,12 @@ def _load(self, stop_on_exception: bool = False) -> Iterable[AssetWrite | NeatIs
"classes",
)

processed_instances = set()

if self.orphanage:
yield self.orphanage
processed_instances.add(self.orphanage.external_id)

for class_ in ordered_classes:
tracker.start(repr(class_.id))

Expand All @@ -119,29 +138,57 @@ def _load(self, stop_on_exception: bool = False) -> Iterable[AssetWrite | NeatIs
for identifier, properties in self.graph_store.read(class_.suffix):
fields = _process_properties(properties, property_renaming_config)
# set data set id and external id
fields["data_set_id"] = self.data_set_id
fields["external_id"] = identifier
fields["dataSetId"] = self.data_set_id
fields["externalId"] = identifier

# check on parent
if "parentExternalId" in fields and fields["parentExternalId"] not in processed_instances:
error = loader_issues.InvalidInstanceError(
type_="asset",
identifier=identifier,
reason=(
f"Parent asset {fields['parentExternalId']} does not exist or failed creation"
f""" {
f', moving the asset {identifier} under orphanage {self.orphanage.external_id}'
if self.orphanage
else ''}"""
),
)
tracker.issue(error)
if stop_on_exception:
raise error.as_exception()
yield error

# if orphanage is set asset will use orphanage as parent
if self.orphanage:
fields["parentExternalId"] = self.orphanage.external_id

# otherwise asset will be skipped
else:
continue

try:
yield AssetWrite.load(fields)
processed_instances.add(identifier)
except KeyError as e:
error = loader_issues.InvalidInstanceError(type_="asset", identifier=identifier, reason=str(e))
tracker.issue(error)
if stop_on_exception:
raise error.as_exception() from e
yield error
yield _END_OF_CLASS

def load_to_cdf(self, client: CogniteClient, dry_run: bool = False) -> Sequence[AssetWrite]:
# generate assets
# check for circular asset hierarchy
# check for orphaned assets
# batch upsert of assets to CDF (otherwise we will hit the API rate limit)

raise NotImplementedError("Not implemented yet, this is placeholder")
yield _END_OF_CLASS

def _get_required_capabilities(self) -> list[Capability]:
raise NotImplementedError("Not implemented yet, this is placeholder")
return [
AssetsAcl(
actions=[
AssetsAcl.Action.Write,
AssetsAcl.Action.Read,
],
scope=AssetsAcl.Scope.DataSet([self.data_set_id]),
)
]

def _upload_to_cdf(
self,
Expand All @@ -150,14 +197,45 @@ def _upload_to_cdf(
dry_run: bool,
read_issues: NeatIssueList,
) -> Iterable[UploadResult]:
raise NotImplementedError("Not implemented yet, this is placeholder")
try:
upserted = client.assets.upsert(items, mode="replace")
except CogniteAPIError as e:
result = UploadResult[str](name="Asset", issues=read_issues)
result.error_messages.append(str(e))
result.failed_upserted.update(item.as_id() for item in e.failed + e.unknown)
result.upserted.update(item.as_id() for item in e.successful)
yield result
else:
for asset in upserted:
result = UploadResult[str](name="asset", issues=read_issues)
result.upserted.add(cast(str, asset.external_id))
yield result

def write_to_file(self, filepath: Path) -> None:
raise NotImplementedError("Not implemented yet, this is placeholder")
if filepath.suffix not in [".json", ".yaml", ".yml"]:
raise ValueError(f"File format {filepath.suffix} is not supported")
dumped: dict[str, list] = {"assets": []}
for item in self.load(stop_on_exception=False):
key = {
AssetWrite: "assets",
NeatIssue: "issues",
_END_OF_CLASS: "end_of_class",
}.get(type(item))
if key is None:
# This should never happen, and is a bug in neat
raise ValueError(f"Item {item} is not supported. This is a bug in neat please report it.")
if key == "end_of_class":
continue
dumped[key].append(item.dump())
with filepath.open("w", encoding=self._encoding, newline=self._new_line) as f:
if filepath.suffix == ".json":
json.dump(dumped, f, indent=2)
else:
yaml.safe_dump(dumped, f, sort_keys=False)


def _process_properties(properties: dict[str, list[str]], property_renaming_config: dict[str, str]) -> dict:
metadata: dict[str, str] = defaultdict(str)
metadata: dict[str, str] = {}
fields: dict[str, str | dict] = {}

for original_property, values in properties.items():
Expand Down
2 changes: 1 addition & 1 deletion cognite/neat/rules/analysis/_asset.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ def class_topological_sort(self) -> list[ClassEntity]:
child_parent_asset[class_] = set()
for property_ in properties.values():
if any(
cast(AssetEntity, implementation).property_ == AssetFields.parent_external_id
cast(AssetEntity, implementation).property_ == AssetFields.parentExternalId
for implementation in property_.implementation
):
child_parent_asset[property_.class_].add(cast(ClassEntity, property_.value_type))
Expand Down
2 changes: 1 addition & 1 deletion cognite/neat/rules/models/asset/_validation.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ def _parent_property_point_to_class(self) -> None:
for implementation in property_.implementation:
if (
isinstance(implementation, AssetEntity)
and implementation.property_ == AssetFields.parent_external_id
and implementation.property_ == AssetFields.parentExternalId
and not isinstance(property_.value_type, ClassEntity)
):
class_property_with_data_value_type.append((property_.class_.suffix, property_.property_))
Expand Down
32 changes: 25 additions & 7 deletions cognite/neat/rules/models/entities.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,13 @@
from functools import total_ordering
from typing import Annotated, Any, ClassVar, Generic, TypeVar, cast

from cognite.client.data_classes.data_modeling.ids import ContainerId, DataModelId, NodeId, PropertyId, ViewId
from cognite.client.data_classes.data_modeling.ids import (
ContainerId,
DataModelId,
NodeId,
PropertyId,
ViewId,
)
from pydantic import (
AnyHttpUrl,
BaseModel,
Expand Down Expand Up @@ -263,9 +269,9 @@ def id(self) -> str:


class AssetFields(StrEnum):
external_id = "external_id"
externalId = "externalId"
name = "name"
parent_external_id = "parent_external_id"
parentExternalId = "parentExternalId"
description = "description"
metadata = "metadata"

Expand Down Expand Up @@ -331,7 +337,8 @@ def _parse(cls, raw: str) -> dict:
else:
return {
"types": [
DataType.load(type_) if DataType.is_data_type(type_) else ClassEntity.load(type_) for type_ in types
(DataType.load(type_) if DataType.is_data_type(type_) else ClassEntity.load(type_))
for type_ in types
]
}

Expand Down Expand Up @@ -447,7 +454,10 @@ class ViewPropertyEntity(DMSVersionedEntity[PropertyId]):
property_: str = Field(alias="property")

def as_id(self) -> PropertyId:
return PropertyId(source=ViewId(self.space, self.external_id, self.version), property=self.property_)
return PropertyId(
source=ViewId(self.space, self.external_id, self.version),
property=self.property_,
)

def as_view_id(self) -> ViewId:
return ViewId(space=self.space, external_id=self.external_id, version=self.version)
Expand All @@ -459,7 +469,10 @@ def from_id(cls, id: PropertyId) -> "ViewPropertyEntity":
if id.source.version is None:
raise ValueError("Version must be specified")
return cls(
space=id.source.space, externalId=id.source.external_id, version=id.source.version, property=id.property
space=id.source.space,
externalId=id.source.external_id,
version=id.source.version,
property=id.property,
)


Expand Down Expand Up @@ -495,7 +508,12 @@ class ReferenceEntity(ClassEntity):
@classmethod
def from_entity(cls, entity: Entity, property_: str) -> "ReferenceEntity":
if isinstance(entity, ClassEntity):
return cls(prefix=str(entity.prefix), suffix=entity.suffix, version=entity.version, property=property_)
return cls(
prefix=str(entity.prefix),
suffix=entity.suffix,
version=entity.version,
property=property_,
)
else:
return cls(prefix=str(entity.prefix), suffix=entity.suffix, property=property_)

Expand Down
14 changes: 13 additions & 1 deletion cognite/neat/utils/upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ class UploadResultList(NeatList[UploadResultCore]): ...
@dataclass
class UploadResult(UploadResultCore, Generic[T_ID]):
created: set[T_ID] = field(default_factory=set)
upserted: set[T_ID] = field(default_factory=set)
deleted: set[T_ID] = field(default_factory=set)
changed: set[T_ID] = field(default_factory=set)
unchanged: set[T_ID] = field(default_factory=set)
Expand All @@ -53,12 +54,21 @@ def failed(self) -> int:

@property
def success(self) -> int:
return len(self.created) + len(self.deleted) + len(self.changed) + len(self.unchanged) + len(self.skipped)
return (
len(self.created)
+ len(self.deleted)
+ len(self.changed)
+ len(self.upserted)
+ len(self.unchanged)
+ len(self.skipped)
)

def dump(self, aggregate: bool = True) -> dict[str, Any]:
output = super().dump(aggregate)
if self.created:
output["created"] = len(self.created) if aggregate else list(self.created)
if self.upserted:
output["upserted"] = len(self.upserted) if aggregate else list(self.upserted)
if self.deleted:
output["deleted"] = len(self.deleted) if aggregate else list(self.deleted)
if self.changed:
Expand All @@ -69,6 +79,8 @@ def dump(self, aggregate: bool = True) -> dict[str, Any]:
output["skipped"] = len(self.skipped) if aggregate else list(self.skipped)
if self.failed_created:
output["failed_created"] = len(self.failed_created) if aggregate else list(self.failed_created)
if self.failed_upserted:
output["failed_upserted"] = len(self.failed_upserted) if aggregate else list(self.failed_upserted)
if self.failed_changed:
output["failed_changed"] = len(self.failed_changed) if aggregate else list(self.failed_changed)
if self.failed_deleted:
Expand Down
9 changes: 9 additions & 0 deletions docs/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,15 @@ Changes are grouped as follows:
- `Fixed` for any bug fixes.
- `Security` in case of vulnerabilities.

## [0.87.3] - 18-07-24
### Added
- Handling of missing parents when generating assets
- Concept of orphanage asset for assets whose parents do not exist
- Uploader to CDF for assets
### Fixed
- Issue of not loading all asset fields when calling `AssetWrite.load()` method


## [0.87.2] - 17-07-24
### Added
- Topological sorting of classes and properties in `AssetRules` to provide proper order of asset creation
Expand Down
Binary file modified docs/artifacts/rules/asset-architect-jimbo.xlsx
Binary file not shown.
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "cognite-neat"
version = "0.87.2"
version = "0.87.3"
readme = "README.md"
description = "Knowledge graph transformation"
authors = [
Expand Down
Binary file modified tests/data/asset-architect-test.xlsx
Binary file not shown.
Loading
Loading