diff --git a/src/vss_tools/vspec/vspec.py b/src/vss_tools/vspec/vspec.py index 96a66d6a..2bdfc194 100644 --- a/src/vss_tools/vspec/vspec.py +++ b/src/vss_tools/vspec/vspec.py @@ -45,6 +45,7 @@ def cli(ctx: click.Context, log_level: str, log_file: Path): "jsonschema": "vss_tools.vspec.vssexporters.vss2jsonschema:cli", "protobuf": "vss_tools.vspec.vssexporters.vss2protobuf:cli", "yaml": "vss_tools.vspec.vssexporters.vss2yaml:cli", + "samm": "vss_tools.vspec.vssexporters.vss2samm.vss2samm:cli", }, ) @click.pass_context diff --git a/src/vss_tools/vspec/vssexporters/vss2samm/config/config.py b/src/vss_tools/vspec/vssexporters/vss2samm/config/config.py new file mode 100644 index 00000000..dbfbbcc7 --- /dev/null +++ b/src/vss_tools/vspec/vssexporters/vss2samm/config/config.py @@ -0,0 +1,34 @@ +# Copyright (c) 2024 Contributors to COVESA +# +# This program and the accompanying materials are made available under the +# terms of the Mozilla Public License 2.0 which is available at +# https://www.mozilla.org/en-US/MPL/2.0/ +# +# SPDX-License-Identifier: MPL-2.0 + +# General CONFIG variables +SAMM_TYPE = 'samm' +SAMM_VERSION = '2.1.0' +# Custom string, which we use to escape " and ' characters in VSS node description/comments +# Used in fileHelper.write_graph_to_file to properly escape characters in filedata, before to write it to a file. +CUSTOM_ESCAPE_CHAR = '#V2E-ESC-CHAR#' + +# CONFIG Variable defined at runtime as per user input and in available init function +OUTPUT_NAMESPACE = None +VSPEC_VERSION = None +SPLIT_DEPTH = None + + +def init(output_namespace: str, vspec_version: str, split_depth: int): + + # Set user defined or OUTPUT_NAMESPACE + global OUTPUT_NAMESPACE + OUTPUT_NAMESPACE = output_namespace + + # Set user defined or OUTPUT_NAMESPACE + global VSPEC_VERSION + VSPEC_VERSION = vspec_version + + # Make sure that split_depth is in correct type and value, else set it to DEFAULT: 1 + global SPLIT_DEPTH + SPLIT_DEPTH = split_depth if (type(split_depth) is int and split_depth > 0) else 1 diff --git a/src/vss_tools/vspec/vssexporters/vss2samm/helpers/dataTypesAndUnits.py b/src/vss_tools/vspec/vssexporters/vss2samm/helpers/dataTypesAndUnits.py new file mode 100644 index 00000000..47adcaf6 --- /dev/null +++ b/src/vss_tools/vspec/vssexporters/vss2samm/helpers/dataTypesAndUnits.py @@ -0,0 +1,80 @@ +# Copyright (c) 2024 Contributors to COVESA +# +# This program and the accompanying materials are made available under the +# terms of the Mozilla Public License 2.0 which is available at +# https://www.mozilla.org/en-US/MPL/2.0/ +# +# SPDX-License-Identifier: MPL-2.0 + +from rdflib import XSD +from . namespaces import get_unit_uri + +DataTypes = { + 'uint8' : XSD.unsignedByte, # noqa: E203 + 'int8' : XSD.byte, # noqa: E203 + 'uint16' : XSD.unsignedShort, # noqa: E203 + 'int16' : XSD.short, # noqa: E203 + 'uint32' : XSD.unsignedInt, # noqa: E203 + 'int32' : XSD.int, # noqa: E203 + 'uint64' : XSD.unsignedLong, # noqa: E203 + 'int64' : XSD.long, # noqa: E203 + 'boolean' : XSD.boolean, # noqa: E203 + 'float' : XSD.float, # noqa: E203 + 'double' : XSD.double, # noqa: E203 + 'string' : XSD.string, # noqa: E203 + 'dateTime' : XSD.dateTime, # noqa: E203 + 'dateTimeStamp' : XSD.dateTimeStamp, # noqa: E203 + 'iso8601' : XSD.dateTimeStamp, # noqa: E203 + 'anyURI' : XSD.anyURI # noqa: E203 +} + +DataUnits = { + 'cm3' : get_unit_uri('cubicCentimetre'), # noqa: E203 + 'cm^3' : get_unit_uri('cubicCentimetre'), # noqa: E203 + 'kw' : get_unit_uri('kilowatt'), # noqa: E203 + 'kW' : get_unit_uri('kilowatt'), # noqa: E203 + 'kWh' : get_unit_uri('kilowattHour'), # noqa: E203 + 'l' : get_unit_uri('litre'), # noqa: E203 + 'l/100km' : get_unit_uri('litrePerHour'), # noqa: E203 + 'mm' : get_unit_uri('millimetre'), # noqa: E203 + 'kg' : get_unit_uri('kilogram'), # noqa: E203 + 'inch' : get_unit_uri('inch'), # noqa: E203 + 'A' : get_unit_uri('ampere'), # noqa: E203 + 'Ah' : get_unit_uri('ampereHour'), # noqa: E203 + 'Nm' : get_unit_uri('newtonMetre'), # noqa: E203 + 'N.m' : get_unit_uri('newtonMetre'), # noqa: E203 + 'V' : get_unit_uri('volt'), # noqa: E203 + 'celsius' : get_unit_uri('degreeCelsius'), # noqa: E203 + 'cm/s' : get_unit_uri('centimetrePerSecond'), # noqa: E203 + 'degree' : get_unit_uri('degreeUnitOfAngle'), # noqa: E203 + 'degrees' : get_unit_uri('degreeUnitOfAngle'), # noqa: E203 + 'degrees/s' : get_unit_uri('degreePerSecond'), # noqa: E203 + 'g/s' : get_unit_uri('gramPerSecond'), # noqa: E203 + 'kilometer' : get_unit_uri('kilometre'), # noqa: E203 + 'km' : get_unit_uri('kilometre'), # noqa: E203 + 'km/h' : get_unit_uri('kilometrePerHour'), # noqa: E203 + 'kpa' : get_unit_uri('kilopascal'), # noqa: E203 + 'kPa' : get_unit_uri('kilopascal'), # noqa: E203 + 'l/h' : get_unit_uri('litrePerHour'), # noqa: E203 + 'm' : get_unit_uri('metre'), # noqa: E203 + 'm/s' : get_unit_uri('metrePerSecond'), # noqa: E203 + 'm/s2' : get_unit_uri('metrePerSecondSquared'), # noqa: E203 + 'm/s^2' : get_unit_uri('metrePerSecondSquared'), # noqa: E203 + 'mbar' : get_unit_uri('millibar'), # noqa: E203 + 'min' : get_unit_uri('minuteUnitOfTime'), # noqa: E203 + 'ml' : get_unit_uri('millilitre'), # noqa: E203 + 'pa' : get_unit_uri('pascal'), # noqa: E203 + 'Pa' : get_unit_uri('pascal'), # noqa: E203 + 'percent' : get_unit_uri('percent'), # noqa: E203 + 'percentage' : get_unit_uri('percent'), # noqa: E203 + 'ratio' : get_unit_uri('rate'), # noqa: E203 + 'rpm' : get_unit_uri('revolutionsPerMinute'), # noqa: E203 + 'g/km' : get_unit_uri('kilogramPerKilometre'), # noqa: E203 + 's' : get_unit_uri('secondUnitOfTime'), # noqa: E203 + 'h' : get_unit_uri('secondUnitOfTime'), # noqa: E203 + 'W' : get_unit_uri('watt'), # noqa: E203 + 'cpm' : get_unit_uri('cycle'), # noqa: E203 + 'bpm' : get_unit_uri('cycle'), # noqa: E203 + 'iso8601' : get_unit_uri('secondUnitOfTime'), # noqa: E203 + 'blank' : get_unit_uri('blank') # noqa: E203 +} diff --git a/src/vss_tools/vspec/vssexporters/vss2samm/helpers/fileHelper.py b/src/vss_tools/vspec/vssexporters/vss2samm/helpers/fileHelper.py new file mode 100644 index 00000000..c3186a9b --- /dev/null +++ b/src/vss_tools/vspec/vssexporters/vss2samm/helpers/fileHelper.py @@ -0,0 +1,58 @@ +# Copyright (c) 2024 Contributors to COVESA +# +# This program and the accompanying materials are made available under the +# terms of the Mozilla Public License 2.0 which is available at +# https://www.mozilla.org/en-US/MPL/2.0/ +# +# SPDX-License-Identifier: MPL-2.0 + +import os +from rdflib import Graph +from vss_tools import log +from .. config import config as cnfg + + +# Write RDF Graph data to specified file +def write_graph_to_file(path_to_file: str, file_name: str, graph: Graph): + log.debug( + "Writing RDF Graph to \n -- file: '%s' \n -- location: '%s'\n -- current working directory: '%s'\n", + file_name, + path_to_file, + os.getcwd()) + + filedata = graph.serialize(format='ttl') + + # Clean up entries like: samm:operations "()" OR samm:operations "( )" + filedata = filedata.replace(' \"()\" ', ' () ') + filedata = filedata.replace(' \"( )\" ', ' ( ) ') + filedata = filedata.replace(' \"( ', ' ( ') + filedata = filedata.replace(' )\" ', ' ) ') + + # Cleanup other escape characters, that were introduced automatically by some of used libraries/tools + filedata = filedata.replace('\\', '') + + # Cleanup some CUSTOM ESCAPED, by this script characters. + # Usually double and single quotes in node.description or node.comment field + filedata = filedata.replace(cnfg.CUSTOM_ESCAPE_CHAR, '\\') + + # Cleanup xsd:anyURI with xsd:double + filedata = filedata.replace('xsd:anyURI', 'xsd:double') + + # Make sure that path_to_file ends with a back slash: \ + if not str(path_to_file).endswith('\\'): + path_to_file = f"{path_to_file}\\" + + # Create and open ttl file for writing + output_file = f"{path_to_file}{file_name}.ttl" + os.makedirs(os.path.dirname(output_file), exist_ok=True) + file_writer = open(output_file, "w") + + # Write filedata to file + file_writer.write(filedata) + + # Add new line and close the file + file_writer.write("\n") + file_writer.close() + + # Return file location + return output_file diff --git a/src/vss_tools/vspec/vssexporters/vss2samm/helpers/namespaces.py b/src/vss_tools/vspec/vssexporters/vss2samm/helpers/namespaces.py new file mode 100644 index 00000000..781a17aa --- /dev/null +++ b/src/vss_tools/vspec/vssexporters/vss2samm/helpers/namespaces.py @@ -0,0 +1,47 @@ +# Copyright (c) 2024 Contributors to COVESA +# +# This program and the accompanying materials are made available under the +# terms of the Mozilla Public License 2.0 which is available at +# https://www.mozilla.org/en-US/MPL/2.0/ +# +# SPDX-License-Identifier: MPL-2.0 + +from rdflib import URIRef +from vss_tools import log +from .. config import config as cnfg + + +def get_vspec_uri(node_name: str): + return URIRef(f'{samm_output_namespace}{node_name}') + + +def get_node_name_from_vspec_uri(node_uri: URIRef): + return node_uri.replace(samm_output_namespace, '') + + +def get_unit_uri(unit_name: str): + return URIRef(f"{samm_base_namespace}:unit:{cnfg.SAMM_VERSION}#{unit_name}") + + +log.debug( + "VSS to SAMM CONFIG:\n -- SAMM_TYPE : %s\n -- SAMM_VERSION: %s\n", + cnfg.SAMM_TYPE, + cnfg.SAMM_VERSION +) + +# NOTE: base_name is more for the ESMF core libraries +# TODO: read https://eclipse-esmf.github.io/samm-specification/snapshot/namespaces.html +# and make sure these are more abstract and can be configured for particular project we use +samm_prefix = "urn:samm" +samm_base_namespace = f"{samm_prefix}:org.eclipse.esmf.samm" +Namespaces = { + 'samm': f"{samm_base_namespace}:meta-model:{cnfg.SAMM_VERSION}#", + 'samm-c': f"{samm_base_namespace}:characteristic:{cnfg.SAMM_VERSION}#", + 'samm-e': f"{samm_base_namespace}:entity:{cnfg.SAMM_VERSION}#", + 'unit': f"{samm_base_namespace}:unit:{cnfg.SAMM_VERSION}#", +} + +# Below formatted namespace should look like: urn:samm:com.covesa.vss.spec:5.0.0# +# and is used for the ":" bindings of the converted to TTLs, VSS Aspect models +# that will refer to the user specified output_namespace +samm_output_namespace = f"{samm_prefix}:{cnfg.OUTPUT_NAMESPACE}:{cnfg.VSPEC_VERSION}#" diff --git a/src/vss_tools/vspec/vssexporters/vss2samm/helpers/sammConcepts.py b/src/vss_tools/vspec/vssexporters/vss2samm/helpers/sammConcepts.py new file mode 100644 index 00000000..a0a7d64c --- /dev/null +++ b/src/vss_tools/vspec/vssexporters/vss2samm/helpers/sammConcepts.py @@ -0,0 +1,110 @@ +# Copyright (c) 2024 Contributors to COVESA +# +# This program and the accompanying materials are made available under the +# terms of the Mozilla Public License 2.0 which is available at +# https://www.mozilla.org/en-US/MPL/2.0/ +# +# SPDX-License-Identifier: MPL-2.0 + + +from enum import Enum +from rdflib import URIRef + +from .. config import config as cnfg +from . namespaces import samm_base_namespace, samm_output_namespace + + +class VSSConcepts (Enum): + EMPTY = "" # noqa: E221 + BELONGS_TO = "belongsToVehicleComponent" # noqa: E221 + HOLDS_VALUE = "holdsState" # noqa: E221 + HAS_SIGNAL = "hasDynamicVehicleProperty" # noqa: E221 + HAS_ATTRIBUTE = "hasStaticVehicleProperty" # noqa: E221 + PART_OF_VEHICLE = "partOfVehicle" # noqa: E221 + PART_OF_VEH_COMP = "partOf" # noqa: E221 + HAS_COMP_INST = "hasInstance" # noqa: E221 + VEHICLE = "Vehicle" # noqa: E221 + VEHICLE_SIGNAL = "ObservableVehicleProperty" # noqa: E221 + VEHICLE_ACT = "ActuatableVehicleProperty" # noqa: E221 + VEHICLE_COMP = "VehicleComponent" # noqa: E221 + VEHICLE_PROP = "DynamicVehicleProperty" # noqa: E221 + VEHICLE_STAT = "StaticVehicleProperty" # noqa: E221 + + def __init__(self, vss_name): + self.ns = samm_output_namespace + self.vsso_name = vss_name + + @property + def uri(self): + return URIRef(self.uri_string) + + @property + def uri_string(self): + return f'{self.ns}{self.value}' + + +class SammConcepts (Enum): + ASPECT = "Aspect" # noqa: E221 + PROPERTY = "Property" # noqa: E221 + ENTITY = "Entity" # noqa: E221 + CHARACTERISTIC = "Characteristic" # noqa: E221 + NAME = "name" # noqa: E221 + CHARACTERISTIC_RELATION = "characteristic" # noqa: E221 + PREFERRED_NAME = "preferredName" # noqa: E221 + PAYLOAD_NAME = "payloadName" # noqa: E221 + DESCRIPTION = "description" # noqa: E221 + PROPERTIES = "properties" # noqa: E221 + OPERATIONS = "operations" # noqa: E221 + EVENTS = "events" # noqa: E221 + DATA_TYPE = "dataType" # noqa: E221 + OPTIONAL = "optional" # noqa: E221 + EXAMPLE_VALUE = "exampleValue" # noqa: E221 + + def __init__(self, vss_name): + self.ns = f"{samm_base_namespace}:meta-model:{cnfg.SAMM_VERSION}#" + self.vsso_name = vss_name + + @property + def uri(self): + return URIRef(self.uri_string) + + @property + def uri_string(self): + return f'{self.ns}{self.value}' + + +# TODO-Kosta: find way to combine SammConcepts and SammCConcepts into one general class +# and just override the init or extra entries since SammCConcepts has few more +class SammCConcepts (Enum): + ASPECT = "Aspect" # noqa: E221 + PROPERTY = "Property" # noqa: E221 + ENTITY = "Entity" # noqa: E221 + SINGLE_ENTITY = "SingleEntity" # noqa: E221 + STATE = "State" # noqa: E221 + TRAIT = "Trait" # noqa: E221 + TIMESTAMP = "Timestamp" # noqa: E221 + RANGE_CONSTRAINT = "RangeConstraint" # noqa: E221 + MEASUREMENT = "Measurement" # noqa: E221 + QUANTIFIABLE = "Quantifiable" # noqa: E221 + LIST = "List" # noqa: E221 + ENUM = "Enumeration" # noqa: E221 + BOOLEAN = "Boolean" # noqa: E221 + VALUES = "values" # noqa: E221 + UNIT = "unit" # noqa: E221 + CONSTRAINT = "constraint" # noqa: E221 + BASE_CHARACTERISTICS = "baseCharacteristic" # noqa: E221 + MAX_VALUE = "maxValue" # noqa: E221 + MIN_VALUE = "minValue" # noqa: E221 + DEFAULT_VALUE = "defaultValue" # noqa: E221 + + def __init__(self, vss_name): + self.ns = f"{samm_base_namespace}:characteristic:{cnfg.SAMM_VERSION}#" + self.vsso_name = vss_name + + @property + def uri(self): + return URIRef(self.uri_string) + + @property + def uri_string(self): + return f'{self.ns}{self.value}' diff --git a/src/vss_tools/vspec/vssexporters/vss2samm/helpers/stringHelper.py b/src/vss_tools/vspec/vssexporters/vss2samm/helpers/stringHelper.py new file mode 100644 index 00000000..607c00d7 --- /dev/null +++ b/src/vss_tools/vspec/vssexporters/vss2samm/helpers/stringHelper.py @@ -0,0 +1,114 @@ +# Copyright (c) 2024 Contributors to COVESA +# +# This program and the accompanying materials are made available under the +# terms of the Mozilla Public License 2.0 which is available at +# https://www.mozilla.org/en-US/MPL/2.0/ +# +# SPDX-License-Identifier: MPL-2.0 + + +def str_to_lc_first(string_to_update: str): + if len(string_to_update) > 0: + return f"{string_to_update[0].lower()}{string_to_update[1:]}" + else: + return string_to_update + + +def str_to_uc_first(string_to_update: str): + if len(string_to_update) > 0: + return f"{string_to_update[0].upper()}{string_to_update[1:]}" + else: + return string_to_update + + +# Helper function, which will make sure that passed string_to_update +# will be converted in camel case, +# based on this guide: https://google.github.io/styleguide/javaguide.html#s5.3-camel-case +def str_to_camel_case(string_to_update: str): + updated_str = '' + + string_to_update_length = len(string_to_update) + + for char_index in range(0, string_to_update_length, 1): + char_to_read = string_to_update[char_index] + + if char_to_read.isnumeric() or len(updated_str) == 0: + # Read numeric characters and first one as they are + updated_str = updated_str + char_to_read + + else: + # Handle any subsequent character in camel case + prev_char = updated_str[char_index - 1] + + next_index = char_index + 1 + if string_to_update_length == next_index: + # Rollback next_index as we will get index out of boundaries error. + # In this case the next_char will be same as char_to_read. + next_index = next_index - 1 + + next_char = string_to_update[next_index] + + if char_to_read.isupper() \ + and (prev_char.isupper() or prev_char.islower() or len(prev_char.strip()) > 0) \ + and (next_char.isupper() or next_char.isnumeric()): + # Convert case of char_to_read from UPPER to LOWER + char_to_read = char_to_read.lower() + + elif len(prev_char.strip()) == 0 and next_char.islower(): + # Convert case of char_to_read from LOWER to UPPER + char_to_read = char_to_read.upper() + + # ELSE: read char_to_read as it is + + updated_str = updated_str + char_to_read + + # Make sure to remove any empty space from updated_str + return updated_str.replace(' ', '') + + +def str_to_uc_first_camel_case(string_to_update: str): + # Make sure that first character of string_to_update is UPPER CASE + string_to_update = str_to_uc_first(string_to_update) + + # Return converted to camelCase string + return str_to_camel_case(string_to_update) + + +def str_to_lc_first_camel_case(string_to_update: str): + # Make sure that first character of string_to_update is UPPER CASE + string_to_update = str_to_lc_first(string_to_update) + + # Return converted to camelCase string + return str_to_camel_case(string_to_update) + + +def str_camel_case_split(string_to_update: str): + updated_str = '' + + string_to_update_length = len(string_to_update) + + for char_index in range(0, string_to_update_length, 1): + char = string_to_update[char_index] + + if len(updated_str) == 0: + # Read first character as it is + updated_str = updated_str + char + + else: + # Read next character so to define whether to split or not + # NOTE: abbreviations in ALL UPPER CASE should not be split + next_index = char_index + 1 + if string_to_update_length == next_index: + next_index = next_index - 1 + next_char = string_to_update[next_index] + + if char.isupper() and next_char.islower(): + updated_str = updated_str + ' ' + char + else: + updated_str = updated_str + char + + return updated_str + + +def is_collection(data_type: str): + return data_type.endswith('[]') diff --git a/src/vss_tools/vspec/vssexporters/vss2samm/helpers/ttlBuilderHelper.py b/src/vss_tools/vspec/vssexporters/vss2samm/helpers/ttlBuilderHelper.py new file mode 100644 index 00000000..8914a8c4 --- /dev/null +++ b/src/vss_tools/vspec/vssexporters/vss2samm/helpers/ttlBuilderHelper.py @@ -0,0 +1,645 @@ +# Copyright (c) 2024 Contributors to COVESA +# +# This program and the accompanying materials are made available under the +# terms of the Mozilla Public License 2.0 which is available at +# https://www.mozilla.org/en-US/MPL/2.0/ +# +# SPDX-License-Identifier: MPL-2.0 + +from typing import Sequence +from rdflib import Graph, Literal, URIRef +from rdflib.namespace import XSD, RDF +from vss_tools import log +from vss_tools.vspec.model.vsstree import VSSNode + +from . import vssHelper as vssHelper +from . sammConcepts import VSSConcepts, SammConcepts, SammCConcepts +from . dataTypesAndUnits import DataTypes +from . namespaces import Namespaces, get_node_name_from_vspec_uri, get_vspec_uri +from . stringHelper import str_to_lc_first_camel_case, str_to_uc_first_camel_case, str_camel_case_split + + +# +# Builder helper, which provides a set of functions, to set up a TTL Graph, +# build various graph nodes like property, characteristic, entity etc. and add them to this graph, +# which then can be stored in an ESMF Aspect Model Editor loadable TTL file. +# + + +# Initialize an empty RDF Graph with related bindings and ESMF - AME namespaces +def setup_graph(): + # Create a Graph + graph = Graph() + + # Bind the namespaces to a prefix for more readable output + graph.bind(VSSConcepts.EMPTY.value, VSSConcepts.EMPTY.uri) + graph.bind("xsd", XSD) + + for nsKey in Namespaces: + graph.bind(nsKey, URIRef(Namespaces[nsKey])) + + return graph + + +# Build an RDF Tree Node as property for the provided vss_node and add it to the specified graph. +def add_graph_node( + graph: Graph, + vss_node: VSSNode, + is_aspect: bool): + node_property_name = vssHelper.get_node_property_name(vss_node) + node_uri = get_vspec_uri(node_property_name) + + # Initialize the Property node - tuple + # NOTE: a Tuple elements are: (Subject, Predicate, Object) + node_tuple = (node_uri, RDF.type, SammConcepts.PROPERTY.uri) + + if is_aspect: + add_node_aspect(graph, vss_node, node_uri) + # ELSE: just build a simple property node as usual + + # Add the node tuple and additional details (preferredName, description etc.) to the graph + graph.add(node_tuple) + + # Preferred name should be white space in front of each upper case letter + # i.e. more human friendly / readable name + # EXAMPLE: + # node.name : IsStrongCrossWindDetected + # should be like: Is Strong Cross Wind Detected + graph.add( + (node_uri, + SammConcepts.PREFERRED_NAME.uri, + Literal(str_camel_case_split(vss_node.ttl_name), "en"))) + + log.debug("Created graph node with URI: '%s'.\n", node_uri) + + return node_uri + + +def add_node_aspect(graph: Graph, vss_node: VSSNode, node_uri: URIRef): + # Initialize Aspect for current Graph. + log.debug("Add aspect node for node uri:\t -- '%s'.", node_uri) + + # NOTE: there can be only 1 Aspect per graph. For Aspect nodes, vss_node.ttl_name is same as vss_node.name + # Aspect Name must be UC FIRST CAMEL CASE + aspect_node_uri = get_vspec_uri(str_to_uc_first_camel_case(vss_node.ttl_name)) + + graph.add((aspect_node_uri, RDF.type, SammConcepts.ASPECT.uri)) + + # TODO: move to helper function which will check for existing node_uri etc!!! + graph.add(( + aspect_node_uri, + SammConcepts.PREFERRED_NAME.uri, + Literal(str_camel_case_split(vss_node.name), "en"))) + + # Add the property instance of this Tree node to its Aspect + graph.add(( + aspect_node_uri, + SammConcepts.PROPERTIES.uri, + Literal(f"( {get_property_uri_from_node_uri(node_uri)} )"))) + + # Add placeholders for Aspect operations and events + graph.add((aspect_node_uri, SammConcepts.OPERATIONS.uri, Literal("()"))) + graph.add((aspect_node_uri, SammConcepts.EVENTS.uri, Literal("()"))) + + +def add_node_branch_characteristic(graph: Graph, vss_node: VSSNode, node_uri: URIRef): + # NOTE: constraints are usually on a leaf node + # and will result in Node having a Trait with BaseCharacteristic and Constraint nodes. + node_char_name = get_node_characteristic_name(node_uri, vssHelper.has_constraints(vss_node)) + node_char_uri = get_vspec_uri(node_char_name) + + log.debug( + "Add branch characteristic: '%s' for VSSNode: '%s'.", + node_char_name, + vss_node.name + ) + + graph.add((node_char_uri, SammConcepts.NAME.uri, Literal(node_char_name))) + graph.add((node_char_uri, RDF.type, SammConcepts.CHARACTERISTIC.uri)) + + # Add description to this node's characteristic + # NOTE: Keep description under this vss_node's branch - node_char_uri, + # because if there is any instances, + # each of its instance(s) will point to node_char_uri + graph.add(( + node_char_uri, + SammConcepts.DESCRIPTION.uri, + Literal(vssHelper.get_node_description(vss_node), "en"))) + + return node_char_uri + + +def add_node_leaf(graph: Graph, node_uri: URIRef, vss_node: VSSNode): + log.debug( + "Add node for VSS Node: '%s' with path: '%s'", + vss_node.name, + vss_node.qualified_name() + ) + + has_limits = vssHelper.has_constraints(vss_node) + + node_char_name = get_node_characteristic_name(node_uri, has_limits) + node_char_uri = get_vspec_uri(node_char_name) + + # Bind current vss_node to its characteristic + graph.add((node_uri, SammConcepts.CHARACTERISTIC_RELATION.uri, node_char_uri)) + + if has_limits: + node_char_uri = add_node_leaf_constraint(graph, node_char_name, node_char_uri, vss_node) + + # Add description to this node's characteristic + # NOTE: this could be the general characteristic or trait, in case if vss_node has limits + graph.add(( + node_char_uri, + SammConcepts.DESCRIPTION.uri, + Literal(vssHelper.get_node_description(vss_node), "en"))) + + # Get RDF and Data types for specified node_characteristic_uri from its related vss_node + rdf_type = vssHelper.get_node_rdf_type(vss_node) + data_type = vssHelper.get_data_type(vss_node) + + match rdf_type: + case SammCConcepts.ENUM | SammCConcepts.STATE: + # Handle ENUM type nodes + log.debug("Set node ENUM values") + + if vss_node.default: + graph.add(( + node_char_uri, + SammCConcepts.DEFAULT_VALUE.uri, + Literal(vss_node.default, datatype=data_type))) + + # Read values for this vss_node's characteristic + enum_values = None + if (hasattr(vss_node, "allowed") and vss_node.allowed and type(vss_node.allowed) is list): + # Add allowed values to this node characteristic + enum_values = get_enum_values(vss_node.allowed) + else: + # NOTE: From VSS 3.0 the 'enum' attribute has been renamed to `allowed` + # However, we will keep this for backwards compatibility. + + # Add ENUM values, as usual, to this node characteristic + enum_values = get_enum_values(vss_node.enum) + + graph.add((node_char_uri, SammCConcepts.VALUES.uri, enum_values)) + + case SammCConcepts.LIST: + # Handle LIST type nodes + log.debug("Set node List values") + + # TODO: do we need to add values here? + + case SammCConcepts.MEASUREMENT: + # Handle MEASUREMENT (unit) type nodes + log.debug("Set node Measurement values") + + if vss_node.has_unit() and vss_node.unit: + graph.add(( + node_char_uri, + SammCConcepts.UNIT.uri, + vssHelper.get_data_unit_uri(vss_node.unit.value))) + + else: + log.warning( + "Unit is not set for vss node: '%s' with type: '%s'\n" + "Setting node 'RDF.type' from MEASUREMENT to default: CHARACTERISTIC\n", + vss_node.name, + vss_node.type + ) + + rdf_type = SammConcepts.CHARACTERISTIC + + if vss_node.default: + graph.add(( + node_uri, + SammConcepts.EXAMPLE_VALUE.uri, + Literal(vss_node.default, datatype=data_type))) + + case SammConcepts.CHARACTERISTIC: + # Handle CHARACTERISTIC type nodes + # TODO-Kosta: such inner logs, should be in the form: + # -- set regular node values + log.debug("Set regular node values") + + if vss_node.default: + if vss_node.has_unit() \ + and vss_node.unit == 'iso8601' \ + and data_type == DataTypes[vss_node.unit]: + # Handle date-time based nodes + log.debug("Set node DATE-TIME values") + + # NOTE: Some examples are provided in the form: 0000-01-01T00:00Z + # which FAILS ON THE ESMF AME VALIDATION because of the 0000 year. + # Correct format in ESMF AME is: + # yyyy-mm-ddThh:mmZ + # OR + # yyyy-mm-ddThh:mm:ss.milliseconds+hh:mm, where +hh:mm is the timezone + # + # Example: '2000-01-01T14:23:00', + # '0001-01-01T00:00:00.00000+00:00', + # '2023-11-27T16:26:05.671Z' + if not vss_node.default.startswith('0000'): + # Add property node exampleValue date time - TIMESTAMP, if is provided and valid + graph.add(( + node_uri, + SammConcepts.EXAMPLE_VALUE.uri, + Literal(vss_node.default, datatype=data_type))) + else: + log.warning( + "Skipping incorrect date time default value: '%s' for VSS Node: '%s'\n" + "CORRECT format + timezone is:\n" + " yyyy-mm-ddThh:mmZ\n" + "or\n" + " yyyy-mm-ddThh:mm:ss.milliseconds+hh:mm\n" + "where yyyy cannot be just 0000, i.e. 0001 is valid year, but 0000 is not valid.\n", + vss_node.default, + vss_node.name + ) + + else: + # Add default ONLY for other than date-time nodes. + # DateTime nodes default is handled above. + graph.add(( + node_uri, + SammConcepts.EXAMPLE_VALUE.uri, + Literal(vss_node.default, datatype=data_type))) + + case _: + # DEFAULT + log.warning( + "Could not match Characteristic type: '%s' for vss_node: '%s'\n", + rdf_type, + vss_node.qualified_name() + ) + + # Set RDF.type for current leaf node characteristic + graph.add((node_char_uri, RDF.type, rdf_type.uri)) + + # Set data_type for current vss_node's characteristic + graph.add((node_char_uri, SammConcepts.DATA_TYPE.uri, data_type)) + + +def add_node_leaf_constraint(graph: Graph, node_char_name: str, node_char_uri: URIRef, vss_node: VSSNode): + log.debug("Add leaf-node constraint") + + constraint_name = str_to_uc_first_camel_case(vss_node.ttl_name + 'Constraint') + constraint_node_uri = get_vspec_uri(constraint_name) + + graph.add((constraint_node_uri, RDF.type, SammCConcepts.RANGE_CONSTRAINT.uri)) + graph.add((constraint_node_uri, SammConcepts.NAME.uri, Literal(constraint_name))) + + # TODO: Workaround since doubles are serialized as scientific numbers + data_type = vssHelper.get_data_type(vss_node) + if data_type == XSD.double: + data_type = XSD.anyURI + + if vss_node.max: + graph.add((constraint_node_uri, SammCConcepts.MAX_VALUE.uri, Literal(vss_node.max, datatype=data_type))) + + if vss_node.min: + graph.add((constraint_node_uri, SammCConcepts.MIN_VALUE.uri, Literal(vss_node.min, datatype=data_type))) + + base_c_name = str_to_uc_first_camel_case(vss_node.ttl_name + 'BaseCharacteristic') + base_c_uri = get_vspec_uri(base_c_name) + + graph.add((node_char_uri, SammCConcepts.BASE_CHARACTERISTICS.uri, base_c_uri)) + graph.add((node_char_uri, RDF.type, SammCConcepts.TRAIT.uri)) + graph.add((node_char_uri, SammConcepts.NAME.uri, Literal(node_char_name))) + graph.add((node_char_uri, SammCConcepts.CONSTRAINT.uri, constraint_node_uri)) + + return base_c_uri + + +# Accepts list of: +# URIRef(s) - URIRef of the property node to be added to specified vss_node_uri +# +# or tuples in the form: +# ( URIRef, ( "optional", True ), ( "payloadName", "givenName" ) ) +# where: +# - URIRef - is the URIRef of the property node to be added to specified vss_node_uri +# - other, OPTIONAL, tuples are for property attributes: optional and payloadName +def add_node_properties(vss_nodes_uris: Sequence[URIRef | tuple], graph: Graph, vss_node_uri: URIRef): + log.debug( + "Prepare properties from vss nodes URIs:\n%s\n", + vss_nodes_uris + ) + + node_props = '' + + for node_uri in vss_nodes_uris: + if node_uri is not None: + property_prefix = ' ' if node_props else '' + + if type(node_uri) is tuple: + # Handle node_uri with some additional parameters + # EXAMPLE tuple: + # ( NodeURI, ( "optional", True ), ( "payloadName", "givenName" ) ) + # will result in: + # [ samm:property :prop1; samm:optional true; samm:payloadName "givenName"; ] + property_name = '' + is_optional = False # type: ignore + payload_name = '' + + for entry in node_uri: + if type(entry) is URIRef: + # TODO-Kosta: why this is like that?!?! + property_name = property_name = get_property_uri_from_node_uri(entry) + + elif type(entry) is tuple: + if entry[0] == "optional": + is_optional = entry[1] is True + + elif entry[0] == "payloadName": + # Make sure that payload name starts with lower case + payload_name = str_to_lc_first_camel_case(entry[1]) + + else: + log.warning( + "Node uri tuple: '%s'\n" + "is not in correct format: (attrName, attrValue) " + "or is not supported for a property attribute.\n", + entry + ) + + else: + log.warning( + "Node uri tuple: '%s'\n" + "is not in correct format: URIRef OR tuple(attrName, attrValue)\n", + entry + ) + + if property_name: + # TODO: make sure that we can work with SammConcepts like: SammConcepts.OPTIONAL + # here, instead of hardcoding samm:property, samm:optional and samm:payloadName + property_name = f'samm:property {property_name}' + is_optional = '; samm:optional true' if is_optional else '' # type: ignore + + if payload_name: + payload_name = f'; samm:payloadName "{payload_name}"' + else: + payload_name = '' + + if is_optional or payload_name: + property = f"[ {property_name}{is_optional}{payload_name} ]" + else: + property = property_name + + node_props += property_prefix + property + + elif type(node_uri) is URIRef: + node_props += property_prefix + get_property_uri_from_node_uri(node_uri) + + else: + log.warning( + "Not supported type: '%s' for Node URI: '%s'.", + type(node_uri), + node_uri + ) + + # Remove trailing white space and set node_props in the form: ( ... ) + node_props = "( {} )".format(node_props.strip()) + + log.debug( + "Add properties URIs to vss node URI: '%s'\n -- properties:\n%s\n", + vss_node_uri, + node_props + ) + + # Add properties to the specified vss_node_uri + graph.add((vss_node_uri, SammConcepts.PROPERTIES.uri, Literal(node_props))) + + +def get_property_uri_from_node_uri(vss_node_uri: URIRef): + return f":{str_to_lc_first_camel_case(vss_node_uri.replace(VSSConcepts.EMPTY.uri, ''))}" + + +def get_node_characteristic_name(node_uri: URIRef, has_limits: bool): + # Node characteristic name is based on the node property URI, and should be in the form: + # NodePropertyNameCharacteristic or NodePropertyNameTrait, in case if the node has some constraints + node_name = get_node_name_from_vspec_uri(node_uri) + characteristic_name_suffix = 'Trait' if has_limits else 'Characteristic' + + return str_to_uc_first_camel_case(node_name + characteristic_name_suffix) + + +# Helper function to convert a list of strings into TTL formatted ENUM VALUES list +# in the form: ( "value1" "value2" ... "value#" ) +# TODO-NOTE: the collection_to_process is provided from VSSNode.allowed, which is currently defined as str. +# However, ALL allowed definitions in VSS are set as list of strings. +# For example check VSS::FuelSystem::SupportedFuelTypes::allowed field. +# +# We might need to further refactor the VssNote.allowed field so it has the correct type: list[str] +# +# For the moment, we define the collection_to_process as: list[str] | str, so to pass the mypy check. +def get_enum_values(collection_to_process: list[str] | str): + enum_values = '' + + for value in collection_to_process: + if type(value) is str: + enum_values += '"' + value + '" ' + else: + enum_values += value + ' ' + + return Literal(f"( {enum_values.strip()} )") + + +def add_node_instances(graph: Graph, instances_dict_tree: dict, node_char_uri: URIRef): + instance_char_uri = None + + if instances_dict_tree: + log.debug( + "Build nodes from instances dict tree: '%s'", + instances_dict_tree['name'] + ) + + # Build Node Instance as characteristic which then will be added to the corresponding node_uri + node_instance_name = str_to_uc_first_camel_case(instances_dict_tree["name"] + "Instance") + instance_char_uri, instance_entity_uri = add_node_instance_characteristic_with_entity(graph, node_instance_name) + + instance_entity_properties = [] + skip_siblings_child_nodes = False + + # Populate the node_instance tree graph + for instance in instances_dict_tree["children"]: + log.debug( + "Build ttl node for instance: '%s'\n -- type: '%s'\n -- path: '%s'", + instance['name'], + instance['type'], + instance['path'] + ) + + if type(instance) is dict and instance["type"] == "branch": + log.debug( + "Build instance path: '%s' as branch node", + instance['path'] + ) + + # To make sure we have unique nodes, we use the instance path, similar to VSSNode ttl_name + ni_path_name = instance['path'].replace('.', '') + ni_char_name = str_to_uc_first_camel_case(ni_path_name) + + ni_prop_uri = '' + ni_type_name = '' + + if instance["instance_type"]: + # This instance is from a specific type. + # For example: instances Row1, Row2, ..., Row# will have an instance_type: Row + # + # Such instances should share a common characteristic, + # i.e. if the instance is DoorRow1, then its characteristic should be: DoorRow, + # which later will be shared with all other DoorRow# instances as their shared Characteristic + + # Instance Type should be prefixed by its parent name to make it more unique, similar to ttl_name + ni_type_name = '{}{}'.format(instance['parent']['name'], instance['instance_type']) + type_char_name = str_to_uc_first_camel_case(ni_type_name) + type_char_uri, type_entity_uri = add_node_instance_characteristic_with_entity(graph, type_char_name) + + # In this case, the created type_entity will represent a common node for each instance of that type. + # Idea is that we can share common type node like characteristic for instances of the same type. + ni_entity_uri = type_entity_uri + + ni_prop_uri = add_node_instance_property(graph, ni_path_name, type_char_uri) + + else: + # If the current instance does not have instance_type, just build it as usual + ni_char_uri, ni_entity_uri = add_node_instance_characteristic_with_entity(graph, ni_char_name) + ni_prop_uri = add_node_instance_property(graph, ni_path_name, ni_char_uri) + + # Make sure to turn of skip_siblings_child_nodes, + # to handle properly children of the current instance + skip_siblings_child_nodes = False + + # Add created instance node uri to its entity properties. + # NOTE: All instance child nodes should be optional + instance_entity_properties.append((ni_prop_uri, ("optional", True), ("payloadName", instance['name']))) + + if not skip_siblings_child_nodes: + # Build instance - entity properties (children) nodes of current instance + add_instance_entity_properties( + graph, instance['children'], ni_type_name, node_char_uri, ni_entity_uri) + + if ni_type_name: + # Set flag to skip adding properties for instance_typed nodes. + # This is to avoid adding of duplicate properties + # of instances of same type to their main - type node + skip_siblings_child_nodes = True + + elif type(instance) is dict and instance["type"] == "attribute": + # Attribute instance is like a VSSNode leaf node + log.debug( + "Build instance path: '%s' as leaf node.", + instance['path'] + ) + + leaf_path_name = instance['path'].replace('.', '') + leaf_uri = add_node_instance_property(graph, leaf_path_name, node_char_uri) + + instance_entity_properties.append((leaf_uri, ("optional", True), ("payloadName", instance['name']))) + + else: + log.warning( + "Instance: '%s' with type: '%s' cannot be processed yet.\n", + instance, + type(instance) + ) + + add_node_properties(instance_entity_properties, graph, instance_entity_uri) + + return instance_char_uri + + +# Helper function to support handling of creation of instance nodes for a VSSNode +def add_node_instance_characteristic_with_entity(graph: Graph, node_name: str): + # NOTE: Node instance characteristic should be of type: + # SammCConcepts.SINGLE_ENTITY instead of SammConcepts.CHARACTERISTIC + + # Append the characteristic class (type) to its name uri + node_char_uri = get_vspec_uri('{}{}'.format(node_name, SammCConcepts.SINGLE_ENTITY.vsso_name)) + node_char_tuple = (node_char_uri, RDF.type, SammCConcepts.SINGLE_ENTITY.uri) + + node_entity_name = str_to_uc_first_camel_case(node_name + "Entity") + node_entity_uri = get_vspec_uri(node_entity_name) + + # Check if characteristic is already added to the graph + if graph.__contains__(node_char_tuple): + # This characteristic is already present, just return its URI and + return node_char_uri, node_entity_uri + # ELSE: continue with node creation as usual + + graph.add(node_char_tuple) + graph.add((node_char_uri, SammConcepts.NAME.uri, Literal(node_name))) + + # TODO: move to helper function which will check for existing node_uri etc!!! + graph.add(( + node_char_uri, + SammConcepts.PREFERRED_NAME.uri, + Literal(str_camel_case_split(node_name), "en"))) + + # Add the node entity to the graph + graph.add((node_entity_uri, RDF.type, SammConcepts.ENTITY.uri)) + + # Add the node entity to its characteristic as data type + graph.add((node_char_uri, SammConcepts.DATA_TYPE.uri, node_entity_uri)) + + return node_char_uri, node_entity_uri + + +def add_node_instance_property(graph: Graph, instance_name: str, instance_char_uri: URIRef): + property_name = str_to_lc_first_camel_case(instance_name) + property_uri = get_vspec_uri(property_name) + property_tuple = (property_uri, RDF.type, SammConcepts.PROPERTY.uri) + + if graph.__contains__(property_tuple): + log.warning( + "A node with URI: '%s' already exists for property: '%s'.\nReturn: '%s'.\n", + property_uri, + property_name, + property_uri + ) + + return {property_uri} + + graph.add(property_tuple) + + # TODO: move to helper function which will check for existing node_uri etc!!! + graph.add(( + property_uri, + SammConcepts.PREFERRED_NAME.uri, + Literal(str_camel_case_split(instance_name), "en"))) + + # Bind this instance node to the provided instance_char_uri + graph.add((property_uri, SammConcepts.CHARACTERISTIC_RELATION.uri, instance_char_uri)) + + return property_uri + + +def add_instance_entity_properties( + graph: Graph, + instance_children: list[dict], + instance_type: str, + characteristic_uri: URIRef, + entity_uri: URIRef): + + if instance_children and len(instance_children) > 0: + entity_properties = [] + + for child_instance in instance_children: + child_path_name = '' + if instance_type: + # Use parent instance type + name for a child name + child_path_name = '{}{}'.format(instance_type, child_instance['name']) + else: + # Use usual child path for its name, similar to VSSNode ttl_name + child_path_name = child_instance['path'].replace('.', '') + + # Add node instance as property node with characteristic of its main - parent VSSNode + # NOTE: at this point each instance property node will be linked to the VSS defined node (characteristic) + # FOR EXAMPLE: Door.Row1.DriverSide instance will have a DoorCharacteristic node as defined in VSS. + child_uri = add_node_instance_property(graph, child_path_name, characteristic_uri) + + # Instance child properties should be optional and with payloadName without its parent prefix. + # For more details check function: add_node_properties + entity_properties.append((child_uri, ("optional", True), ("payloadName", child_instance['name']))) + + # Add child properties nodes to this instance's entity node + add_node_properties(entity_properties, graph, entity_uri) diff --git a/src/vss_tools/vspec/vssexporters/vss2samm/helpers/ttlHelper.py b/src/vss_tools/vspec/vssexporters/vss2samm/helpers/ttlHelper.py new file mode 100644 index 00000000..4b76f280 --- /dev/null +++ b/src/vss_tools/vspec/vssexporters/vss2samm/helpers/ttlHelper.py @@ -0,0 +1,226 @@ +# Copyright (c) 2024 Contributors to COVESA +# +# This program and the accompanying materials are made available under the +# terms of the Mozilla Public License 2.0 which is available at +# https://www.mozilla.org/en-US/MPL/2.0/ +# +# SPDX-License-Identifier: MPL-2.0 + +from rdflib import Graph, URIRef +from rdflib.namespace import RDF +from vss_tools import log +from vss_tools.vspec.model.vsstree import VSSNode + +from .. config import config as cnfg +from . import ttlBuilderHelper as ttlBuilder +from . import vssHelper as vssHelper +from . namespaces import get_node_name_from_vspec_uri, samm_output_namespace +from . sammConcepts import VSSConcepts, SammConcepts +from . fileHelper import write_graph_to_file +from . stringHelper import str_to_uc_first_camel_case + + +# TODO: check if better to work with Path for path_to_ttl instead of a string + +# Parse provided VSSNode to an RDF Graph and write it to a TTL file +def parse_vss_tree(path_to_ttl: str, vss_node: VSSNode, split_vss: bool): + # Process provided vss_node so to get all of its unique VSSNode names, check if there is any duplicates etc. + # This will be further used to make sure that we don't get any duplicate nodes in the generated TTL graph(s) + vssHelper.count_vss_tree_unique_node_names(vss_node) + + log.debug( + "Parse VSS node: '%s' to TTL file\n -- as aspect%s\n", + vss_node.name, + '\n -- split' if split_vss else '' + ) + + # Skip parsing of VSSNode is branches, which have only deprecated children + # NOTE: this is a special case for the OBD branch, which children are marked as deprecated from VSS 5.0, + # but the branch itself is not marked as deprecated. + deprecated_children = [] + if vss_node.is_branch(): + for child_node in vss_node.children: + if hasattr(child_node, 'deprecation') and len(child_node.deprecation.strip()) > 0: + deprecated_children.append(child_node) + + if len(deprecated_children) == len(vss_node.children): + log.warning( + "All child nodes of VSSNode: '%s' are deprecated.\n" + "Skip the parsing of VSSNode: '%s'.\n", + vss_node.name, + vss_node.name) + + return 'DEPRECATED' + + # Initialize RDF Graph for current node + graph = ttlBuilder.setup_graph() + + # Build VSS graph tree node. + # NOTE: ESMF-AME requires standalone aspect models to have an aspect node. + node_uri = handle_vss_node(path_to_ttl, graph, vss_node, True, split_vss) + + if node_uri != 'DEPRECATED': + # Print graph for current vss_node to a TTL file + # NOTE: make sure that TTL file name will reflect this graph's Aspect model name, i.e. uc first camel case + vss_node_ttl_file = write_graph_to_file( + path_to_ttl, + str_to_uc_first_camel_case(vss_node.ttl_name), + graph) + + log.debug( + "TTL file for parsed VSS node: '%s' is:\n'%s'\n", + vss_node.name, + vss_node_ttl_file + ) + + return node_uri + + +def handle_vss_node( + path_to_ttl: str, + graph: Graph, + vss_node: VSSNode, + is_aspect: bool, + split_vss: bool): + log.debug( + "Handle VSSNode: '%s'\n -- node VSS path: '%s'\n -- is_aspect: '%s'\n -- is_expanded: '%s'", + vss_node.name, + vss_node.qualified_name(), + is_aspect, + is_node_expanded(vss_node) + ) + + # Check if node is deprecated and return DEPRECATED so it will be skipped for further conversion to TTL + if hasattr(vss_node, 'deprecation') and len(vss_node.deprecation.strip()) > 0: + log.warning( + "Skipping VSSNode: '%s' since it is deprecated.\n" + "Deprecation: '%s'\n", + vss_node.name, + vss_node.deprecation + ) + + return 'DEPRECATED' + + # Build the general graph node for current vss_node + node_uri = ttlBuilder.add_graph_node(graph, vss_node, is_aspect) + + if vss_node.is_branch(): + node_char_uri = ttlBuilder.add_node_branch_characteristic(graph, vss_node, node_uri) + + if vss_node.has_instances() and is_node_expanded(vss_node) is False: + # Build instance(s) node(s) for the current vss_node ONLY when node is NOT EXPANDED. + # Otherwise, the expanded child nodes will cause conflicts with generated instance nodes + # and mess up the generated aspect model. + # + # NOTE: in this case the node's characteristic (node_char_uri), + # will become a characteristic of each of this vss_node's instance(s) + node_name = get_node_name_from_vspec_uri(node_uri) + + instances_dict_tree = vssHelper.get_instances_dict_tree(vss_node.instances, node_name) + + node_instance_uri = ttlBuilder.add_node_instances(graph, instances_dict_tree, node_char_uri) + + # Link current vss_node to its instance uri, as if the instance uri is characteristic of this vss_node + graph.add((node_uri, SammConcepts.CHARACTERISTIC_RELATION.uri, node_instance_uri)) + + else: + # Link the characteristic uri to current vss_node's node_uri as usual + graph.add((node_uri, SammConcepts.CHARACTERISTIC_RELATION.uri, node_char_uri)) + + handle_branch_node(path_to_ttl, graph, vss_node, split_vss, node_uri, node_char_uri) + + if vss_node.is_leaf: + ttlBuilder.add_node_leaf(graph, node_uri, vss_node) + + return node_uri + + +def handle_branch_node( + path_to_ttl: str, + graph: Graph, + vss_node: VSSNode, + split_vss: bool, + node_uri: URIRef, + node_char_uri: URIRef): + log.debug("Handle branch node for VSSNode: '%s'", vss_node.name) + + # Create node Entity to this vss_node branch + # TODO-Kosta: comment should be like: + # -- create node Entity + log.debug(" -- create node Entity") + + # NOTE: this will be like a Class representation of the current vss_node + # In order to keep consistent the naming of semantic nodes, + # we should append 'Entity' to the node's name, taken from its node_uri. + node_name = get_node_name_from_vspec_uri(node_uri) + + # Node Entity name should be in camel case format with its first character in UPPER CASE + node_entity_name = str_to_uc_first_camel_case(node_name + 'Entity') + node_entity_uri = URIRef(VSSConcepts.EMPTY.uri_string + node_entity_name) + + # Add the node entity to the graph + graph.add((node_entity_uri, RDF.type, SammConcepts.ENTITY.uri)) + + # Add the node entity to its characteristic as data type + graph.add((node_char_uri, SammConcepts.DATA_TYPE.uri, node_entity_uri)) + + # Populate Entity properties if the current vss_node holds any child node + if hasattr(vss_node, "children") and len(vss_node.children) > 0: + properties_uris = [] + + for child_node in vss_node.children: + child_node_uri = None + + if (split_vss and child_node.depth <= cnfg.SPLIT_DEPTH) and child_node.is_branch(): + # Build VSS node into separate Aspect model + # when --split option is provided + # and depth of current VSSNode is within specified config SPLIT_DEPT level, + # Default SPLIT_DEPT is 1, i.e. just 1st level branches like Vehicle.Cabin etc. + child_node_uri = parse_vss_tree(path_to_ttl, child_node, True) + + else: + # Each child should be a leaf node of its parent - i.e. NO ASPECTS and no split for child nodes + child_node_uri = handle_vss_node(path_to_ttl, graph, child_node, False, False) + + # TODO: review these and make sure we don't overlap usage of samm_output_namespace fields + if child_node_uri is not None \ + and child_node_uri != 'DEPRECATED' \ + and str(child_node_uri) != samm_output_namespace: + # Each property should have payloadName = property name, + # so to avoid the prefixed ttl_name when generating APIs and JSON payloads + properties_uris.append((child_node_uri, ("payloadName", child_node.name))) + + elif child_node_uri != 'DEPRECATED': + log.warning( + "Child node: '%s' does not have a valid URI: '%s' and is not added to '%s' node.\n", + child_node.name, + str(child_node_uri), + vss_node.name + ) + + # Add properties to current node_entity_uri + if len(properties_uris) > 0: + ttlBuilder.add_node_properties(properties_uris, graph, node_entity_uri) + + +# Helper function to check if a VSSNode is expanded +def is_node_expanded(vss_node: VSSNode): + # Try to check if vss_node has instances and its expanded flag is set + is_expanded = vss_node.has_instances() and vss_node.expanded is True + + if is_expanded is False and hasattr(vss_node, "children") and vss_node.children and len(vss_node.children) > 0: + # NOTE: this is a workaround since currently the available vss_node.expanded flag + # does not seem to reflect properly when VSSNode is expanded or not i.e., + # when script is using + # 1. the DEFAULT: --expand option which will lead to expansion of instances as children + # 2. or user has specified the --no-expand option - to prevent above instantiation i.e., + # instances are not created as children and this VssNode children holds + # ONLY the properties (details) of a single instance. + for child_node in vss_node.children: + is_expanded = child_node.__getattribute__("$file_name$") == "Generated" + + if is_expanded is True: + # It is enough if at least 1 child node is expanded. + break + + return is_expanded diff --git a/src/vss_tools/vspec/vssexporters/vss2samm/helpers/vssHelper.py b/src/vss_tools/vspec/vssexporters/vss2samm/helpers/vssHelper.py new file mode 100644 index 00000000..fd005576 --- /dev/null +++ b/src/vss_tools/vspec/vssexporters/vss2samm/helpers/vssHelper.py @@ -0,0 +1,572 @@ +# Copyright (c) 2024 Contributors to COVESA +# +# This program and the accompanying materials are made available under the +# terms of the Mozilla Public License 2.0 which is available at +# https://www.mozilla.org/en-US/MPL/2.0/ +# +# SPDX-License-Identifier: MPL-2.0 + +import re + +from ast import literal_eval +from typing import Any +from vss_tools import log +from vss_tools.vspec.model.vsstree import VSSNode, VSSType, VSSDataType +from vss_tools.vspec.model.constants import VSSTreeType + +from .. config import config as cnfg +from . sammConcepts import SammConcepts, SammCConcepts +from . dataTypesAndUnits import DataTypes, DataUnits +from . stringHelper import ( + str_to_lc_first_camel_case, + str_to_uc_first_camel_case, + is_collection +) + +# +# Helper script. +# Provides set of functions to work with VSSNodes. +# + + +# A DICT collection of key => value entries, where: +# :key - vss_node.name of a node from provided VSSNote (tree) for parsing. +# :value - object of type: { counter: int, vss_paths: [str]} +# where: +# - counter - holds number of occurrences of the corresponding :key in the main VSSNode +# - vss_paths - array of qualified VSS path, for each node which name is matching the current :key +# +# This collection will be populated on the very first call of parse_vss_tree +# so to have a full overview of available VSS nodes' names, if there is any duplicated ones etc. +# Then it will be used to defined whether the corresponding vss_node should be prefixed with its parent name or not, +# as implemented in the should_use_parent_prefix function. +top_vss_tree_unique_node_names: dict[str, Any] = {} + + +def count_vss_tree_unique_node_names(vss_node: VSSNode): + if type(top_vss_tree_unique_node_names) is dict and len(top_vss_tree_unique_node_names.keys()) == 0: + # THIS SHOULD BE DONE ONLY ONCE, + # i.e. when we parse the top level tree and we have not yet read all of its nodes + populate_unique_node_names(top_vss_tree_unique_node_names, vss_node) + + +# Traverse through each node of provided vss_node tree and record unique node names, +# their number of occurrences and vss_paths for their duplicates if there is any. +# For more details, check comment for: top_vss_tree_unique_node_names field on lines: 31-42. +def populate_unique_node_names(node_names_dict: dict[str, Any], vss_node: VSSNode): + if not node_names_dict.get(vss_node.name): + # ADD vss_node to node_names_dict + node_names_dict.__setitem__( + vss_node.name, + { + 'counter': 1, + 'vss_paths': [vss_node.qualified_name()] + }) + else: + # UPDATE vss_node counters in the node_names_dict + node_names_dict[vss_node.name]['counter'] += 1 # type: ignore + node_names_dict[vss_node.name]['vss_paths'].append(vss_node.qualified_name()) # type: ignore + + # Process vss_node children + if vss_node.children and len(vss_node.children) > 0: + for vss_child_node in vss_node.children: + populate_unique_node_names(node_names_dict, vss_child_node) + + +def get_parent_prefix_for_ttl_name(vss_node: VSSNode, ttl_name: str, use_vehicle_prefix=False): + parent_prefix = '' + + if vss_node.parent and (vss_node.parent.name != "Vehicle" or use_vehicle_prefix): + # There is a special case. + # EXAMPLE: Issuer signal, which parent is: Identifier + # and it appears in two separate branches: + # Vehicle.Cabin.Seat.Occupant.Identifier.Issuer + # and + # Vehicle.Driver.Identifier.Issuer + # + # In this case we need to take current vss_node parent ttl_name, instead of just its name, + # so this will make the current vss_node ttl_name unique enough, + # in order it can be loaded across different models. + # + # This is specially, when a user is using the split option + if vss_node.parent.name != vss_node.parent.ttl_name \ + and top_vss_tree_unique_node_names[vss_node.name]["counter"] > 0 \ + and sum(vss_node.parent.name in vss_path + for vss_path in top_vss_tree_unique_node_names[vss_node.name]["vss_paths"]) > 1: + parent_prefix = vss_node.parent.ttl_name + + else: + parent_prefix = vss_node.parent.name + + if ttl_name.startswith(parent_prefix): + return get_parent_prefix_for_ttl_name(vss_node.parent, ttl_name, use_vehicle_prefix=True) + + return parent_prefix + + +# Set ttl_name for provided vss_node. +# BY DEFAULT: ttl_name will be set to the current vss_node.name. +# vss_node - the VSSNode to be updated +# use_parent_prefix - when provided, the vss_node.parent.name will be used as prefix to its vss_node.name, +# if the vss_node has a parent +# overwrite_ttl_name - if specified and the provided vss_node, already has a ttl_name, +# it will be overwritten based on preserve_ttl_name +# preserve_ttl_name - if specified and the provided vss_node, already has a ttl_name, +# then its ttl_name will be preserved and will be prefixed with its parent name +def set_ttl_name(vss_node: VSSNode, use_parent_prefix: bool, overwrite_ttl_name=False, preserve_ttl_name=False): + log.debug("Set ttl name for VSS Node: '%s'.", vss_node.name) + + if vss_node.ttl_name and not overwrite_ttl_name: + log.debug( + " -- node ttl name: '%s' is already set.", + vss_node.ttl_name + ) + + else: + ttl_name = vss_node.ttl_name if vss_node.ttl_name and preserve_ttl_name else vss_node.name + + if use_parent_prefix: + parent_prefix = get_parent_prefix_for_ttl_name(vss_node, ttl_name) + vss_node.ttl_name = parent_prefix + ttl_name + + else: + vss_node.ttl_name = ttl_name + + log.debug( + " -- %s node ttl name: '%s'.", + "updated" if overwrite_ttl_name else "added", + vss_node.ttl_name + ) + + +# Helper function to clone a provided VSSNode, instead of to copy it, using the python available copy functionality, +# which will copy the whole node. +# +# This is required so we can have a "clean" unbound to properties of the provided VSSNode, copy of it, +# which then can be further manipulated. +# +# For example: filter the children of cloned VSSNode and add them to its cloned (copied) version. +# REASON FOR THIS is that: +# VSSNode.children is immutable, when the node had some children +# BUT: we can set (add) them, in case when VSSNode.children was initially empty. +def clone_vss_node(vss_node: VSSNode): + # NOTE: Below properties names are taken from: VSSNode: validate_vss_element function. + # If required to sync these, use the aforementioned function as reference. + # + # Currently we skip the "children" one, + # as these will be further filtered and populated in the cloned_vss_node. + vss_node_dict = {} + + # key:value pairs where: + # key - is the VSSNode property + # value - is the VSS specification, property, + # which was used for the creation of vss_node, + # i.e. its corresponding name in the used for creation of VSSNode, dictionary. + properties_to_clone = { + "$file_name$" : "$file_name$", # noqa: E203 + "type" : "type", # noqa: E203 + "datatype" : "datatype", # noqa: E203 + "description" : "description", # noqa: E203 + "unit" : "unit", # noqa: E203 + "uuid" : "uuid", # noqa: E203 + "min" : 'min', # noqa: E203 + "max" : "max", # noqa: E203 + "aggregate" : "aggregate", # noqa: E203 + "default" : "default", # noqa: E203 + "instances" : "instances", # noqa: E203 + "deprecation" : "deprecation", # noqa: E203 + "arraysize" : "arraysize", # noqa: E203 + "comment" : "comment", # noqa: E203 + "allowed" : "allowed" # noqa: E203 + } + + for vss_prop_name in properties_to_clone.keys(): + if vss_prop_name == "default" and vss_node.type not in [VSSType.ATTRIBUTE, VSSType.ACTUATOR, VSSType.SENSOR]: + # Skip processing for "default" fields for NON ATTRIBUTE, ACTUATOR and SENSOR nodes + continue + + if hasattr(vss_node, vss_prop_name): + vss_prop_value = vss_node.__getattribute__(vss_prop_name) + vss_dict_prop_name = properties_to_clone[vss_prop_name] + + if type(vss_prop_value) in [VSSType, VSSDataType]: + vss_node_dict[vss_dict_prop_name] = vss_prop_value.value + else: + vss_node_dict[vss_dict_prop_name] = vss_prop_value + + return VSSNode(vss_node.name, vss_node_dict, VSSTreeType.SIGNAL_TREE.available_types()) + + +# Helper function, to check whether to use parent prefix for a vss_node or not +def should_use_parent_prefix(vss_node: VSSNode): + if (top_vss_tree_unique_node_names[vss_node.name]['counter'] > 1) \ + or (vss_node.is_leaf + and hasattr(vss_node, 'datatype') + and vss_node.datatype is VSSDataType.BOOLEAN + and (vss_node.name.lower().startswith('is')) + and (vss_node.parent and vss_node.parent.name not in vss_node.name) + ): + # Use prefix when: + # 1) the current vss_node.name occurs multiple time in the top level VSS tree + # or + # 2) for leaf nodes of type BOOLEAN, + # which name starts with 'Is' or 'is' + # and their parent name is not included within their own vss_node.name + return True + else: + return False + + +# Helper function to check, whether specified vss_node is selected to be converted to a TTL model +def is_vss_node_selected_for_processing(selected_signals_paths: list[str], vss_node: VSSNode): + if selected_signals_paths is None or (type(selected_signals_paths) is list and len(selected_signals_paths) == 0): + return True + + # VSSNode qualified_name returns the VSS path in form: PARENT_NAME.PATH.TO.NODE.NODE_NAME + vss_node_path = vss_node.qualified_name() + node_is_found = False + for signal_path in selected_signals_paths: + if len(vss_node_path) > len(signal_path): + # Check if selected signal_path is part of the current vss_node path + node_is_found = signal_path in vss_node_path + else: + # Check if the current current vss_node path is part of the selected signal_path + node_is_found = vss_node_path in signal_path + + if node_is_found: + break + + return node_is_found + + +# Filter provided vss_node, based on specified selected_paths +# and return a new vss_node with just selected_paths VSSNodes +def filter_vss_tree(vss_node: VSSNode, selected_paths: list[str]) -> VSSNode | None: + filtered_vss_node = None + + if is_vss_node_selected_for_processing(selected_paths, vss_node): + # Clone the current vss_node so we can further filter its child nodes + filtered_vss_node = clone_vss_node(vss_node) + + if vss_node.children is not None and len(vss_node.children) > 0: + # Filter vss_node child nodes and add them to the filtered_vss_node + filtered_children = [] + + for child_node in vss_node.children: + if is_vss_node_selected_for_processing(selected_paths, child_node): + # Call this function recursively so to filter current child_node + filtered_child_node = filter_vss_tree(child_node, selected_paths) + + if filtered_child_node is not None: + # Add filtered child_node to filtered_children or skip it + filtered_children.append(filtered_child_node) + + filtered_vss_node.children = filtered_children + + # ELSE: vss_node was not selected for processing. + # Just return it as None so it will be skipped for further processing + return filtered_vss_node + + +def get_node_property_name(vss_node: VSSNode): + # Property names are based on the vss_node.ttl_name => make sure it is set + set_ttl_name(vss_node, should_use_parent_prefix(vss_node)) + + # Graph - property names are in camel case format, where 1st character is in lower case + return str_to_lc_first_camel_case(vss_node.ttl_name) + + +# Helper function to build VSSnode description in the form: +# VSS path : ... +# Description: ... +# Comment : ... +# and return it for addition to a graph node for specified vss_node +def get_node_description(vss_node: VSSNode): + # Set description for this vss_node. + # Will include its: VSS path, Description, Comment and Unit if any of these is available. + description = '' + + # Use spacer to align ':' in 'VSS path:' with 'Description:' and / or 'Comment:' + spacer = '' + + if hasattr(vss_node, "description") and len(str(vss_node.description).strip()) > 0: + if '"' in str(vss_node.description): + # Escape double quotes within vss_node.description + vss_node.description = vss_node.description.replace('"', f'{cnfg.CUSTOM_ESCAPE_CHAR}"') # type: ignore + + # Set 3 spaces spacer, so to align 'VSS path:' with 'Description:' + spacer = ' ' + description = f"\n\nDescription: {vss_node.description}" + + # NOTE: there is also a vss_node.comment field which also holds some details + # Add the vss_node.comment to its description + if hasattr(vss_node, "comment") and len(vss_node.comment.strip()) > 0: + if '"' in vss_node.comment: + vss_node.comment = vss_node.comment.replace('"', f'{cnfg.CUSTOM_ESCAPE_CHAR}"') + + # Use the 3 empty spaces spacer, when there is description, + # otherwise set it to align 'VSS path:' with 'Comment:' + spacer = spacer if description else ' ' + + # Align 'Comment:' with 'Description:' and 'VSS path:' + description = f"{description}\n\nComment{' ' if description else ''}: {vss_node.comment}" + + if vss_node.has_unit(): + description = f"{description}\n\nUnit{' ' if description else ''}: {vss_node.get_unit()}" + + return f"\nVSS path{spacer}: {vss_node.qualified_name()}{description}" + + +def has_constraints(vss_node: VSSNode): + return vss_node.max or vss_node.min + + +def get_instances_dict_tree(vss_node_instances: list | None, node_instance_name: str): + log.debug( + "Create instances node tree for node instance name: '%s'.", + node_instance_name + ) + + parsed_instances = [] + + if type(vss_node_instances) is list and len(vss_node_instances) > 0: + for instance_key in range(len(vss_node_instances)): + instance = vss_node_instances[instance_key] + parsed_instance = parse_instance(instance) + parsed_instances.append(parsed_instance) + + log.debug( + " -- convert parsed_instances: \n%s\n to a tree now...", + parsed_instances + ) + + node_instance_dict: dict[str, Any] = get_instance_dict(node_instance_name, None, '') + + add_instance_to_dict_tree(node_instance_dict, parsed_instances, '') + + # TODO: maybe try to return a VSSNode(vss_node.name, vss_node_dict) + # IF so this will require further rework on the whole instance node generation + # implemented in TTLBuilderHelper::add_node_instances + return node_instance_dict + + +def parse_instance(instance_to_parse): + parsed_instance = None + + if type(instance_to_parse) is str \ + and instance_to_parse.__contains__("[") \ + and instance_to_parse.endswith("]"): + # More likely, current instance is in the form: "Row[1,2]" or Row[1,4] or "SomeOtherName[x,y]" + # Example: Row[1, 2] or Row[1, 4] + parsed_instance = [] + + # Take the name of this instance + instance_parts = instance_to_parse.split("[") + instance_name = instance_parts[0] + instance_entries = literal_eval("[" + instance_parts[1]) + + if len(instance_entries) == 2 \ + and instance_entries[0] == 1 \ + and sum(float(entry).is_integer() for entry in instance_entries) == 2: + # Here instances are set as a range, starting with 1 up to some other number + # EXAMPLE: Row[1,4] - i.e. rows from 1st to 4th or: Row1, Row2, Row3, Row4 + start = instance_entries[0] + end = instance_entries[1] + + while start <= end: + parsed_instance.append(f"{instance_name}{start}") + start += 1 + + else: + for instance_entry in instance_entries: + parsed_instance.append(f"{instance_name}{instance_entry}") + else: + parsed_instance = instance_to_parse + + return parsed_instance + + +# Helper function, which builds an instance object dict for specified instance_name, +# which then can be added to an instances_dict_tree +def get_instance_dict(instance_name: str, parent_dict: dict | None, instance_type: str): + # TODO: use as ref: load_tree and render_tree in vspec\__init__.py + # IDEALLY this should return a VSSNode instance!!! + + # Default type is: attribute for a string. Branch is for instance with children + # TODO: if required we can add some kind of a short description or other additional fields + name = str_to_uc_first_camel_case(instance_name) + + instance = { + "type": "attribute", + "name": name, + "children": None, + "path": name, + "description": "", + "instance_type": instance_type, + "parent": parent_dict + } + + return instance + + +def add_instance_to_dict_tree(parent_dict_tree: dict[str, Any], instance_to_add, instance_type: str): + if type(instance_to_add) is str: + # ADD instance to the provided parent_tree + + # Default type is: attribute for a string. Branch is for instance with children + instance_dict = get_instance_dict(instance_to_add, parent_dict_tree, instance_type) + + if type(parent_dict_tree) is dict: + # Make sure to set parent type to: branch + parent_dict_tree["type"] = "branch" + + if not parent_dict_tree["children"]: + # Make sure to initialize parent children + parent_dict_tree["children"] = list() + + # Add path up to this instance node + instance_dict["path"] = f"{parent_dict_tree['path']}.{instance_dict['name']}" + + parent_dict_tree["children"].append(instance_dict) + + elif type(parent_dict_tree) is list: + # TODO-Kosta: can we ever get into this point!?!?!?! + # Add path up to this instance node + # instance["path"] = instance['name'] + parent_dict_tree.append(instance_dict) # type: ignore + + else: + log.warning( + "Provided parent_tree: '%s' must be either a dict or list (array).\n", + parent_dict_tree + ) + + elif type(instance_to_add) is list: + # Handle instance_to_add as list of instances to be added to the parent_tree + # Call this function recursively to add each instance entry to the provided parent_tree + + # Get instance entries type. If these are in the form: Name1, Name2, Name3 etc + # then their type will be: Name, otherwise it will be None + instance_type = get_instances_type_from_list(instance_to_add) + + if type(parent_dict_tree) is dict and parent_dict_tree["children"] and len(parent_dict_tree["children"]) > 0: + # Add each entry in instance_to_add as a child of the parent_tree["children"] + for inst_dict in parent_dict_tree["children"]: + for entry in instance_to_add: + add_instance_to_dict_tree(inst_dict, entry, instance_type) + + else: + # Add each entry in instance_to_add as sibling in current parent_tree + for entry in instance_to_add: + add_instance_to_dict_tree(parent_dict_tree, entry, instance_type) + + else: + log.warning( + "Provided instance_to_add: '%s' type: '%s' is not supported.\n", + instance_to_add, + type(instance_to_add) + ) + + +def get_instances_type_from_list(instances_list: list): + instance_type = '' + # Check if instance entries are of same type, based on their name prefix + if sum(type(entry) is str for entry in instances_list) == len(instances_list): + # Check if each entry has a common name + for entry in instances_list: + # Make sure to strip any numbers of the entry (instance_name) + instance_name = re.sub(r'[0-9]', '', entry) + if not instance_type: + # Read instance_type from very first entry + instance_type = instance_name + + elif instance_type != instance_name: + instance_type = '' + # Break the checks as there is at least 1 entry, + # which is not in same format as the 1st one + break + + return instance_type + + +def get_node_rdf_type(vss_node: VSSNode): + if hasattr(vss_node, "allowed") and vss_node.allowed and type(vss_node.allowed) is list: + # NOTE: ENUM does not have defaultValue + # Instead defaultValue is part of STATE characteristic which inherits from ENUM + return SammCConcepts.ENUM if not vss_node.default else SammCConcepts.STATE + + elif hasattr(vss_node, 'datatype') and vss_node.datatype and is_collection(vss_node.datatype.value): + return SammCConcepts.LIST + + elif vss_node.type in [VSSType.ATTRIBUTE, VSSType.ACTUATOR, VSSType.SENSOR] \ + and vss_node.has_unit() \ + and vss_node.unit != 'iso8601': + # NOTE: DateTime vss_nodes should have an SammConcepts.CHARACTERISTIC data_type + return SammCConcepts.MEASUREMENT + + elif hasattr(vss_node, "allowed") and vss_node.allowed != "" and type(vss_node.allowed) is not list: + log.warning( + "VSSNode: '%s' with path: '%s',\n" + "has allowed data of type: '%s', which is not handled yet.\n" + "Allowed data: '%s'.\n", + vss_node.name, + vss_node.qualified_name(), + type(vss_node.allowed), + vss_node.allowed + ) + + # For unset / unmatched vss_node units - just leave its characteristic type to: Characteristic + return SammConcepts.CHARACTERISTIC + + else: + # Return vss_node RDF.type to be a general Characteristic + return SammConcepts.CHARACTERISTIC + + +def get_data_type(vss_node: VSSNode): + if vss_node.has_unit() and vss_node.unit == 'iso8601': + # DateTime VSSNodes data_type should be based on their unit + # instead of their datatype, which is more likely to be set as 'string'. + return DataTypes[vss_node.unit.value] + + # TODO: review usage of datatype as per latest COVESA VSS and VSS-TOOLS and update code accordingly!!! + if not hasattr(vss_node, 'datatype'): + log.warning( + "DataType field is missing in VSSNode: '%s'\nDEFAULTING it to: '%s'\n", + vss_node.name, + DataTypes['anyURI'] + ) + + return DataTypes['anyURI'] + + # Set data_type based on VssNode datatype as usual or default it to: anyURI + data_type = vss_node.datatype.value if vss_node.datatype else 'anyURI' + + # VssNode datatype has been set as array + if data_type.endswith('[]'): + # Read just the type of the array / list + data_type = data_type[:-2] + + if DataTypes.get(data_type): + return DataTypes[data_type] + else: + log.warning( + "DataType: '%s' not found\nDEFAULTING it to: '%s'\n", + data_type, + DataTypes['anyURI'] + ) + + return DataTypes['anyURI'] + + +def get_data_unit_uri(unit: str): + if DataUnits.get(unit): + return DataUnits[unit] + else: + log.warning( + "No DataUnit found for unit: '%s'.\nDEFAULTING it to: '%s'\n", + unit, + DataUnits['blank'] + ) + + return DataUnits['blank'] diff --git a/src/vss_tools/vspec/vssexporters/vss2samm/vss2samm.py b/src/vss_tools/vspec/vssexporters/vss2samm/vss2samm.py new file mode 100644 index 00000000..c97c1914 --- /dev/null +++ b/src/vss_tools/vspec/vssexporters/vss2samm/vss2samm.py @@ -0,0 +1,331 @@ +# Copyright (c) 2024 Contributors to COVESA +# +# This program and the accompanying materials are made available under the +# terms of the Mozilla Public License 2.0 which is available at +# https://www.mozilla.org/en-US/MPL/2.0/ +# +# SPDX-License-Identifier: MPL-2.0 + +# +# Convert all vspec input files to a ESMF - Aspect Model Editor (SAMM) - ttl formatted file(s). +# + + +import logging +import importlib +import rich_click as click +import vss_tools.vspec.cli_options as clo + +from pathlib import Path +from vss_tools import log +from vss_tools.vspec import VSpecError +from vss_tools.vspec.model.vsstree import VSSNode +from vss_tools.vspec.vssexporters.utils import get_trees + +from . config import config as cnfg + +VSSConcepts = None +vssHelper = None +ttlHelper = None + + +def __setup_environment(output_namespace, vspec_version, split_depth: int): + # Initialize config before to load other helpers / libraries, based on defined by user input, config data + cnfg.init(output_namespace, vspec_version, split_depth) + + global VSSConcepts + VSSConcepts = importlib.import_module("vss_tools.vspec.vssexporters.vss2samm.helpers.sammConcepts").VSSConcepts + + global vssHelper + vssHelper = importlib.import_module("vss_tools.vspec.vssexporters.vss2samm.helpers.vssHelper") + + global ttlHelper + ttlHelper = importlib.import_module("vss_tools.vspec.vssexporters.vss2samm.helpers.ttlHelper") + + +# TODO: Currently this is a workaround to read the Vehicle.VersionVSS, which is provided from COVESA/VSS +# and provides the supported VSS version. +# In best case this functionality should be provided by the vspec tool, loaded in this script. +# +# Once we migrate the vss2samm tool under the COVESA/vss-tools, +# we could move this functionality under the vspec script +# and make sure that we read the version from the COVESA/vehicle_signal_specification/VERSION +# or as helper function under the VSSNode - root tree. +def __get_version_vss(vss_tree: VSSNode): + # DEFAULT version would be 1.0.0 + major = 1 + minor = 0 + patch = 0 + + if vss_tree.children and len(vss_tree.children) > 0: + # Get the VersionVSS node so to extract current VSS version from it. + vss_version_node = next(filter(lambda vs_child: vs_child.name == 'VersionVSS', vss_tree.children), None) + + if vss_version_node: + for v_child in vss_version_node.children: + if v_child.name in ['Major', 'Minor', 'Patch'] \ + and hasattr(v_child, 'default') \ + and type(v_child.default) is int and v_child.default > 1: + + match v_child.name: + case 'Major': + major = v_child.default + case 'Minor': + minor = v_child.default + case 'Patch': + patch = v_child.default + + return f"{major}.{minor}.{patch}" + + +# Exporter specific options +@clo.option( + '-sigf', '--signals-file', + type=click.Path(dir_okay=False, readable=True, path_type=Path, exists=True), + help="""\b +Path to file with selected VSS signals to be converted. +Allows to convert just selected VSS signals into aspect model(s), if '-spl / --split' is enabled. +\033[36mNOTE:\033[0m each signal in the file should be on a new line and in the format of: + \033[96mPARENT_SIGNAL.PATH.TO.CHILD_SIGNAL\033[0m as defined in VSS. +\033[33mEXAMPLE:\033[0m \033[96m-sigf PATH_TO_FILE/selected_signals.txt\033[0m + """ +) +@clo.option( + "--split-depth", "-spld", + type=int, + default=1, + show_default=False, + help="""\b +Number - used to define, up to which level, VSS branches will be converted into single aspect models. +Can be used in addition to the \033[32m-spl, --split\033[0m option. +Default value of 1 means that only 1st level VSS branches like Vehicle.Cabin, Vehicle.Chassis etc., +will be converted to separate aspect models. +\033[30m[default: 1]\033[0m + """ +) +@clo.option( + "--split/--no-split", "-spl", + default=False, + show_default=False, + help="""\b +Boolean flag - used to indicate whether to convert VSS specifications in separate ESMF Aspect(s) +or the whole (selected) VSS specification(s) will be combined into single ESMF Aspect model. +\033[30m[default: False]\033[0m + """ +) +@clo.option( + "--target-namespace", "-tns", "output_namespace", + type=str, + default='com.covesa.vss.spec', + show_default=False, + help="""\b +Namespace for VSS library, located in specified '--target-folder'. +Will be used as name of the folder where VSS Aspect models (TTLs) are to be stored. +This folder will be created as subfolder of the specified '--target-folder' parameter. +\033[30m[default: com.covesa.vss.spec]\033[0m + """ +) +# TODO: shall we use Path, similar to output option, or just a simple string here? +@clo.option( + "--target-folder", "-tf", + # type=click.Path(dir_okay=True, writable=True, path_type=Path), + default='vss_ttls/', + show_default=False, + help="""\b +Path to or name for the target folder, where generated aspect models (.ttl files) will be stored. +\033[36mNOTE:\033[0m This folder will be created relatively to the folder from which this script is called. +\033[30m[default: vss_ttls/]\033[0m +""" +) +# END of VSS2SAMM CUSTOM OPTIONS +@click.command() +@clo.vspec_opt +@clo.include_dirs_opt +@clo.extended_attributes_opt +@clo.strict_opt +@clo.aborts_opt +@clo.uuid_opt +@clo.expand_opt +@clo.overlays_opt +@clo.quantities_opt +@clo.units_opt +@clo.types_opt +@clo.types_output_opt +@clo.log_level_opt +@clo.log_file_opt +def cli( + vspec: Path, + target_folder: str, + include_dirs: tuple[Path], + extended_attributes: tuple[str], + strict: bool, + aborts: tuple[str], + uuid: bool, + expand: bool, + overlays: tuple[Path], + quantities: tuple[Path], + units: tuple[Path], + types: tuple[Path], + types_output: Path, + signals_file, + output_namespace, + split, + split_depth, + log_level: str, + log_file: Path, +): + """ + Export as Eclipse Semantic Modeling Framework (ESMF) - Semantic Aspect Meta Model (SAMM) - .ttl files. + """ + + # TODO-Kosta: For some reason, the main cli in vspec.py does not seem to get executed + # and log level is always INFO. + # Therefore I use the setLevel of log_level option before further execution of this script. + if log_file: + file_handler = logging.FileHandler(log_file, mode="w") + file_handler.setFormatter( + logging.Formatter("%(asctime)s:%(levelname)s:%(message)s") + ) + log.addHandler(file_handler) + + log.setLevel(log_level) + # END of copied logging set up from vspec.py::cli + + log.info("Loading VSS Tree...\n") + + tree, datatype_tree = [None, None] + + try: + # TODO-Kosta: double check if we need all of below options or we can skip some? + tree, datatype_tree = get_trees( + include_dirs, + aborts, + strict, + extended_attributes, + uuid, + quantities, + vspec, + units, + types, + types_output, + overlays, + expand + ) + + except VSpecError as e: + log.error("There was a problem with loading of VSS for processing.") + log.error("Error: %s", e) + exit(255) + + # Get the VSS version from the vss_tree::VersionVSS + vss_version = __get_version_vss(tree) + + __setup_environment(output_namespace, vss_version, split_depth) + + included_signals = None + included_branches = None + included_signals_input = None + + if signals_file is not None: + log.info("Using signals from:\n '%s'\n", signals_file) + + with open(signals_file, 'r') as f: + included_signals_input = f.read().splitlines() + + else: + log.info("No signals selected.\nCreating model for the whole tree.\n") + + log.info( + "Update output: '%s' with ESMF namespace: '%s' and VSS Version: '%s'.\n", + target_folder, + output_namespace, + cnfg.VSPEC_VERSION + ) + + # TODO: shall we use Path for target_folder or keep it as TEXT + # if not target_folder.name.endswith('/') and not target_folder.name.endswith('\\'): + # target_folder = Path(f"{target_folder.name}/") + + # target_folder = Path(f"{target_folder.name}/{output_namespace}/{cnfg.VSPEC_VERSION}") + + if not target_folder.endswith('/') and not target_folder.endswith('\\'): + target_folder = f"{target_folder}\\" + + target_folder = f"{target_folder}{output_namespace}\\{cnfg.VSPEC_VERSION}" + + log.info("Generating SAMM output...\n") + + if (included_signals_input is not None): + included_signals = [] + included_branches = [] + for signal in included_signals_input: + path = signal.split('.') + included_signals.append(path[-1]) + if (len(path) > 1): + for x in path[:-1]: + if x not in included_branches: + # Add only unique entries (branches) + included_branches.append(x) + + if included_branches is not None: + log.info( + "Included branches:\n%s\n", + included_branches + ) + + if included_signals is not None: + log.info( + "Included signals:\n%s\n", + included_signals + ) + + if included_signals_input is not None: + log.info( + "Included paths:\n%s\n", + included_signals_input + ) + + try: + parsed_tree_uri = None + + # NOTE: below used parse_vss_tree function will store generated RDF Graph to dedicated TTL file + if included_signals_input is not None and ( + type(included_signals_input) is list + and len(included_signals_input) > 0 + ): + # Filter the VSS tree based on included_signals_input + filtered_vss_tree = vssHelper.filter_vss_tree(tree, included_signals_input) # type: ignore + + if filtered_vss_tree is not None: + # Parse the filtered_vss_tree to AME TTL. + parsed_tree_uri = ttlHelper.parse_vss_tree(target_folder, filtered_vss_tree, split) # type: ignore + + else: + # Parse the whole tree as usual + parsed_tree_uri = ttlHelper.parse_vss_tree(target_folder, tree, split) # type: ignore + + else: + # TODO: parse_vss_tree below and above takes target_folder as string + # there is no auto-wrapping, therefore we need to convert the full target_folder to str. + # Therefore, if we go with target_folder to be handled as Path, we need to pass it as string, + # using: str(target_folder) OR update called function and underlying ones + # to work with Path target_folder and not str. + + # Work with vss_tree as usual + parsed_tree_uri = ttlHelper.parse_vss_tree(target_folder, tree, split) # type: ignore + + if parsed_tree_uri != 'DEPRECATED': + log.info( + "\nVSS to ESMF - SAMM processing - COMPLETED\n\nAll ttl files are located in: '%s'\n\n", + target_folder + ) + else: + log.warning( + "VSS to ESMF - SAMM processing - COMPLETED\n\n" + "VSS tree was not converted because it is DEPRECATED.\n\n" + ) + + except VSpecError as e: + log.error("\nThere was a problem with VSS to ESMF processing.") + log.error("\nError: %s", e) + exit(255)