Skip to content

Commit

Permalink
[NEAT-33] Exporter DMS Workflow (#286)
Browse files Browse the repository at this point in the history
* feat: Exporter Step

* fix: creation of workflow

* refactor: Update export to cdf workflow

* refactor: additional testing

* fix: bug in importer

* refactor; fix'

* feat: added export DMS

* refactor: fix line endings
  • Loading branch information
doctrino authored Mar 1, 2024
1 parent 1d72aa4 commit 7b4b129
Show file tree
Hide file tree
Showing 11 changed files with 270 additions and 26 deletions.
17 changes: 17 additions & 0 deletions cognite/neat/rules/exporters/_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,20 @@ def total(self) -> int:
@property
def failed(self) -> int:
return self.failed_created + self.failed_changed

def as_report_str(self) -> str:
line = []
if self.created:
line.append(f"created {self.created}")
if self.changed:
line.append(f"updated {self.changed}")
if self.skipped:
line.append(f"skipped {self.skipped}")
if self.unchanged:
line.append(f"unchanged {self.unchanged}")
if self.failed_created:
line.append(f"failed to create {self.failed_created}")
if self.failed_changed:
line.append(f"failed to update {self.failed_changed}")

return f"{self.name.title()}: {', '.join(line)}"
45 changes: 32 additions & 13 deletions cognite/neat/rules/exporters/_rules2dms.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,16 @@ class DMSExporter(BaseExporter[DMSSchema]):
def __init__(
self,
rules: DMSRules,
export_components: Literal["all", "spaces", "data_models", "views", "containers"] = "all",
export_components: frozenset[Literal["all", "spaces", "data_models", "views", "containers"]] = frozenset(
{"all"}
),
include_space: set[str] | None = None,
existing_handling: Literal["fail", "skip", "update"] = "update",
):
self.rules = rules
self.export_components = export_components
self.include_space = include_space
self.existing_handling = existing_handling
self._schema: DMSSchema | None = None

def export_to_file(self, filepath: Path) -> None:
Expand All @@ -58,13 +62,13 @@ def export(self) -> DMSSchema:
def export_to_cdf(self, client: CogniteClient, dry_run: bool = False) -> Iterable[UploadResult]:
schema = self.export()
to_export: list[tuple[CogniteResourceList, DataModelingLoader]] = []
if self.export_components in {"all", "spaces"}:
if self.export_components.intersection({"all", "spaces"}):
to_export.append((schema.spaces, SpaceLoader(client)))
if self.export_components in {"all", "containers"}:
if self.export_components.intersection({"all", "containers"}):
to_export.append((schema.containers, ContainerLoader(client)))
if self.export_components in {"all", "views"}:
if self.export_components.intersection({"all", "views"}):
to_export.append((schema.views, ViewLoader(client)))
if self.export_components in {"all", "data_models"}:
if self.export_components.intersection({"all", "data_models"}):
to_export.append((schema.data_models, DataModelLoader(client)))

for items, loader in to_export:
Expand All @@ -83,9 +87,22 @@ def export_to_cdf(self, client: CogniteClient, dry_run: bool = False) -> Iterabl
else:
to_update.append(item)
created = len(to_create)
changed = len(to_update)
failed_created = 0
failed_changed = 0

skipped = 0
if self.existing_handling == "update":
changed = len(to_update)
failed_changed = 0
elif self.existing_handling == "skip":
changed = 0
failed_changed = 0
skipped += len(to_update)
elif self.existing_handling == "fail":
failed_changed = len(to_update)
changed = 0
else:
raise ValueError(f"Unsupported existing_handling {self.existing_handling}")

error_messages: list[str] = []
if not dry_run:
to_create = loader.sort_by_dependencies(to_create)
Expand All @@ -96,18 +113,20 @@ def export_to_cdf(self, client: CogniteClient, dry_run: bool = False) -> Iterabl
created -= failed_created
error_messages.append(e.message)

try:
loader.update(to_update)
except CogniteAPIError as e:
failed_changed = len(e.failed) + len(e.unknown)
changed -= failed_changed
error_messages.append(e.message)
if self.existing_handling == "update":
try:
loader.update(to_update)
except CogniteAPIError as e:
failed_changed = len(e.failed) + len(e.unknown)
changed -= failed_changed
error_messages.append(e.message)

yield UploadResult(
name=loader.resource_name,
created=len(to_create),
changed=len(to_update),
unchanged=len(unchanged),
skipped=skipped,
failed_created=failed_created,
failed_changed=failed_changed,
error_messages=error_messages,
Expand Down
2 changes: 1 addition & 1 deletion cognite/neat/rules/importers/_spreadsheet2rules.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ class ExcelImporter(BaseImporter):
def __init__(self, filepath: Path):
self.filepath = filepath

def to_rules(self, role: RoleTypes | str | None = None) -> DomainRules | InformationRules | DMSRules:
def to_rules(self, role: RoleTypes | None = None) -> DomainRules | InformationRules | DMSRules:
excel_file = pd.ExcelFile(self.filepath)

try:
Expand Down
70 changes: 70 additions & 0 deletions cognite/neat/workflows/examples/Export DMS/workflow.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
configs: []
description: null
implementation_module: null
name: Export DMS
steps:
- complex_configs: {}
configs: {}
description: null
enabled: true
id: step_861205
label: Upload Rules Spreadsheet
max_retries: 0
method: null
params:
file_type: rules
retry_delay: 3
stype: file_uploader
system_component_id: null
transition_to:
- step_295479
trigger: true
ui_config:
pos_x: 630
pos_y: 69
- complex_configs: {}
configs:
Report Formatter: html
role: infer
description: null
enabled: true
id: step_295479
label: Validate
max_retries: 0
method: ImportExcelValidator
params: {}
retry_delay: 3
stype: stdstep
system_component_id: null
transition_to:
- step_50885
trigger: false
ui_config:
pos_x: 630
pos_y: 180
- complex_configs:
components:
containers: true
data_models: true
spaces: true
views: true
configs:
dry_run: 'False'
existing_component_handling: update
multi_space_components_create: 'True'
description: null
enabled: true
id: step_50885
label: Export Data Model to CDF
max_retries: 0
method: ExportDataModelStorage
params: {}
retry_delay: 3
stype: stdstep
system_component_id: null
transition_to: []
trigger: false
ui_config:
pos_x: 630
pos_y: 274
system_components: []
9 changes: 3 additions & 6 deletions cognite/neat/workflows/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,12 +104,9 @@ def save_workflow_to_storage(self, name: str, custom_implementation_module: str
full_path = self.workflows_storage_path / name / "workflow.yaml"
full_path.parent.mkdir(parents=True, exist_ok=True)
wf = self.workflow_registry[name]
with full_path.open("w") as f:
f.write(
wf.serialize_workflow(
output_format="yaml", custom_implementation_module=custom_implementation_module
)
)
full_path.write_text(
wf.serialize_workflow(output_format="yaml", custom_implementation_module=custom_implementation_module)
)

def create_new_workflow(self, name: str, description=None, mode: str = "manifest") -> WorkflowDefinition:
"""Create new workflow in memory"""
Expand Down
122 changes: 120 additions & 2 deletions cognite/neat/workflows/steps/lib/rules_exporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,14 @@

import cognite.neat.graph.extractors._graph_capturing_sheet
from cognite.neat.exceptions import wrangle_warnings
from cognite.neat.rules import exporter
from cognite.neat.rules import exporter, exporters
from cognite.neat.rules.exporter._rules2dms import DMSSchemaComponents
from cognite.neat.rules.exporter._rules2graphql import GraphQLSchema
from cognite.neat.rules.exporter._rules2ontology import Ontology
from cognite.neat.utils.utils import generate_exception_report
from cognite.neat.workflows._exceptions import StepNotInitialized
from cognite.neat.workflows.model import FlowMessage, StepExecutionStatus
from cognite.neat.workflows.steps.data_contracts import CogniteClient, DMSSchemaComponentsData, RulesData
from cognite.neat.workflows.steps.data_contracts import CogniteClient, DMSSchemaComponentsData, MultiRuleData, RulesData
from cognite.neat.workflows.steps.step_model import Configurable, Step

__all__ = [
Expand All @@ -28,6 +28,7 @@
"ExportRulesToExcel",
"GenerateDMSSchemaComponentsFromRules",
"DeleteDMSSchemaComponents",
"ExportDataModelStorage",
]

CATEGORY = __name__.split(".")[-1].replace("_", " ").title()
Expand Down Expand Up @@ -511,3 +512,120 @@ def run(self, rules_data: RulesData) -> FlowMessage: # type: ignore[override, s
full_path = Path(self.data_store_path) / Path(self.configs["output_file_path"])
exporter.ExcelExporter.from_rules(rules=rules_data.rules).export_to_file(filepath=full_path)
return FlowMessage(output_text="Generated Excel file from rules")


class ExportDataModelStorage(Step):
"""
This step exports generated DMS Schema components to CDF
"""

description = "This step exports generated DMS Schema to CDF."
version = "private-beta"
category = CATEGORY

configurables: ClassVar[list[Configurable]] = [
Configurable(
name="dry_run",
value="False",
label=("Whether to perform a dry run of the export. "),
options=["True", "False"],
),
Configurable(
name="components",
type="multi_select",
value="",
label="Select which DMS schema component(s) to export to CDF",
options=["spaces", "containers", "views", "data_models"],
),
Configurable(
name="existing_component_handling",
value="fail",
label=(
"How to handle situation when components being exported in CDF already exist."
"Fail the step if any component already exists, "
"Skip the component if it already exists, "
" or Update the component try to update the component."
),
options=["fail", "skip", "update"],
),
Configurable(
name="multi_space_components_create",
value="False",
label=(
"Whether to create only components belonging to the data model space"
" (i.e. space define under Metadata sheet of Rules), "
"or also additionally components outside of the data model space."
),
options=["True", "False"],
),
]

def run(self, rules: MultiRuleData, cdf_client: CogniteClient) -> FlowMessage: # type: ignore[override, syntax]
if self.configs is None or self.data_store_path is None:
raise StepNotInitialized(type(self).__name__)
existing_components_handling = cast(
Literal["fail", "update", "skip"], self.configs["existing_component_handling"]
)
multi_space_components_create: bool = self.configs["multi_space_components_create"] == "True"
components_to_create = {
cast(Literal["all", "spaces", "data_models", "views", "containers"], key)
for key, value in self.complex_configs["components"].items()
if value
}
dry_run = self.configs["dry_run"] == "True"

if not components_to_create:
return FlowMessage(
error_text="No DMS Schema components selected for upload! Please select minimum one!",
step_execution_status=StepExecutionStatus.ABORT_AND_FAIL,
)
dms_rules = rules.dms
if dms_rules is None:
return FlowMessage(
error_text="Missing DMS rules in the input data! Please ensure that a DMS rule is provided!",
step_execution_status=StepExecutionStatus.ABORT_AND_FAIL,
)

dms_exporter = exporters.DMSExporter(
dms_rules,
export_components=frozenset(components_to_create),
include_space=None if multi_space_components_create else {dms_rules.metadata.space},
existing_handling=existing_components_handling,
)

output_dir = self.data_store_path / Path("staging")
output_dir.mkdir(parents=True, exist_ok=True)
schema_zip = f"{dms_rules.metadata.external_id}.zip"
schema_full_path = output_dir / schema_zip
dms_exporter.export_to_file(schema_full_path)

report_lines = ["# DMS Schema Export to CDF\n\n"]
errors = []
for result in dms_exporter.export_to_cdf(client=cdf_client, dry_run=dry_run):
report_lines.append(result.as_report_str())
errors.extend(result.error_messages)

report_lines.append("\n\n# ERRORS\n\n")
report_lines.extend(errors)

output_dir = self.data_store_path / Path("staging")
output_dir.mkdir(parents=True, exist_ok=True)
report_file = "dms_component_creation_report.txt"
report_full_path = output_dir / report_file
report_full_path.write_text("\n".join(report_lines))

output_text = (
"<p></p>"
"Download DMS Export Report"
f'<a href="/data/staging/{report_file}?{time.time()}" '
f'target="_blank">report</a>'
"<p></p>"
"Download DMS exported schema"
f'- <a href="/data/staging/{schema_zip}?{time.time()}" '
f'target="_blank">{schema_zip}</a>'
)

if errors:
return FlowMessage(error_text=output_text, step_execution_status=StepExecutionStatus.ABORT_AND_FAIL)
else:
return FlowMessage(output_text=output_text)
11 changes: 7 additions & 4 deletions cognite/neat/workflows/steps/lib/rules_importer.py
Original file line number Diff line number Diff line change
Expand Up @@ -644,12 +644,15 @@ def run(self, flow_message: FlowMessage) -> (FlowMessage, MultiRuleData): # typ
except (KeyError, TypeError):
error_text = "Expected input payload to contain 'full_path' key."
return FlowMessage(error_text=error_text, step_execution_status=StepExecutionStatus.ABORT_AND_FAIL)
role = self.configs.get("role", "infer")
# is role is None, it will be inferred from the rules file
role = self.configs.get("role")
if role == "infer":
role = None
excel_importer = importers.ExcelImporter(rules_file_path)
try:
rules = excel_importer.to_rules(role)
except ValueError:
error_text = "Failed to validate rules. Please check the rules file."
rules = excel_importer.to_rules(role) # type: ignore[arg-type]
except ValueError as e:
error_text = f"Failed to validate rules. Please check the rules file. {e}"
return FlowMessage(error_text=error_text, step_execution_status=StepExecutionStatus.ABORT_AND_FAIL)

output_text = "Rules validation passed successfully!"
Expand Down
Empty file.
Empty file.
20 changes: 20 additions & 0 deletions tests/tests_unit/rules/test_importers/test_excel_importer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
from pathlib import Path

import pytest

from cognite.neat.rules.importers import ExcelImporter
from tests.config import DOC_KNOWLEDGE_ACQUISITION_TUTORIAL


def valid_rules_filepaths():
yield pytest.param(DOC_KNOWLEDGE_ACQUISITION_TUTORIAL / "cdf-dms-architect-alice.xlsx", id="Alice rules")


class TestExcelImporter:
@pytest.mark.parametrize("filepath", valid_rules_filepaths())
def test_import_valid_rules(self, filepath: Path):
importer = ExcelImporter(filepath)

rules = importer.to_rules()

assert rules

0 comments on commit 7b4b129

Please sign in to comment.