diff --git a/docker/run_local_docker.sh b/docker/run_local_docker.sh index e17ae5a63a41..9fd8eb457fbe 100755 --- a/docker/run_local_docker.sh +++ b/docker/run_local_docker.sh @@ -164,4 +164,3 @@ sleep 60 # Sleep for 60 seconds to make sure the elasticsearch reindexing from U tput setaf 2 echo "✔ OpenMetadata is up and running" - diff --git a/ingestion/setup.py b/ingestion/setup.py index 9f44a9e80120..0963875a616a 100644 --- a/ingestion/setup.py +++ b/ingestion/setup.py @@ -246,6 +246,7 @@ "sagemaker": {VERSIONS["boto3"]}, "salesforce": {"simple_salesforce==1.11.4"}, "sap-hana": {"hdbcli", "sqlalchemy-hana"}, + "sas": {}, "singlestore": {VERSIONS["pymysql"]}, "sklearn": {VERSIONS["scikit-learn"]}, "snowflake": {VERSIONS["snowflake"]}, diff --git a/ingestion/src/metadata/examples/workflows/sas.yaml b/ingestion/src/metadata/examples/workflows/sas.yaml new file mode 100644 index 000000000000..2bff249da1af --- /dev/null +++ b/ingestion/src/metadata/examples/workflows/sas.yaml @@ -0,0 +1,28 @@ +source: + type: sas + serviceName: local_sas + serviceConnection: + config: + type: SAS + username: username + password: password + serverHost: http://your-server-host.org + datatables: True + dataTablesCustomFilter: None + reports: False + reportsCustomFilter: None + dataflows: False + dataflowsCustomFilter: None + sourceConfig: + config: + type: DatabaseMetadata +sink: + type: metadata-rest + config: {} +workflowConfig: + loggerLevel: DEBUG + openMetadataServerConfig: + hostPort: http://localhost:8585/api + authProvider: openmetadata + securityConfig: + jwtToken: eyJraWQiOiJHYjM4OWEtOWY3Ni1nZGpzLWE5MmotMDI0MmJrOTQzNTYiLCJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJvcGVuLW1ldGFkYXRhLm9yZyIsInN1YiI6ImluZ2VzdGlvbi1ib3QiLCJlbWFpbCI6ImluZ2VzdGlvbi1ib3RAb3Blbm1ldGFkYXRhLm9yZyIsImlzQm90Ijp0cnVlLCJ0b2tlblR5cGUiOiJCT1QiLCJpYXQiOjE3MDQ3NDY0MzYsImV4cCI6bnVsbH0.qRWByaRw3F1B0bDqdpmxDsHEUx9Npk5VNelpabuvmVERjIG8AY88p1dv5gBME6Y1-kfTtCAMtSxsll0_gTR35D1foVdXgGMRAPyNXH0JHRpENBnT1V3OVO0yRWmeqsp5K7yqiaVa-CeqxitSgYrns58BFRD_vFX5vxMHirFBrFddH7b8af8823a8Oh-wMDCuJJd7Ya61Kv6gUpssE7Y403PwomLK6pE7gsAgZ5YOyjHlQ889C9z8oLQ268BX3ndmTp6t1J7MYgOEqeIzeCoBQZf6aBdyCqYajB3TTwb3SHFz2TEYF5xeLvYlZ77ek3l22m6Ehh5d4t2jB-ZOFDUkqA \ No newline at end of file diff --git a/ingestion/src/metadata/ingestion/source/database/sas/__init__.py b/ingestion/src/metadata/ingestion/source/database/sas/__init__.py new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/ingestion/src/metadata/ingestion/source/database/sas/client.py b/ingestion/src/metadata/ingestion/source/database/sas/client.py new file mode 100644 index 000000000000..f61693aad578 --- /dev/null +++ b/ingestion/src/metadata/ingestion/source/database/sas/client.py @@ -0,0 +1,184 @@ +# Copyright 2021 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +Client to interact with SAS Viya apis +""" + +# pylint: disable=protected-access +import requests + +from metadata.generated.schema.entity.services.connections.database.sasConnection import ( + SASConnection, +) +from metadata.ingestion.ometa.client import REST, APIError, ClientConfig +from metadata.utils.logger import ingestion_logger + +logger = ingestion_logger() + + +class SASClient: + """ + Client to interact with SAS Information Catalog + """ + + def __init__(self, config: SASConnection): + self.config: SASConnection = config + self.auth_token = get_token( + config.serverHost, config.username, config.password.get_secret_value() + ) + client_config: ClientConfig = ClientConfig( + base_url=config.serverHost, + auth_header="Authorization", + auth_token=self.get_auth_token, + api_version="", + allow_redirects=True, + verify=False, + ) + self.client = REST(client_config) + # custom setting + self.enable_datatables = config.datatables + self.custom_filter_datatables = config.dataTablesCustomFilter + self.enable_reports = config.reports + self.custom_filter_reports = config.reportsCustomFilter + self.enable_dataflows = config.dataflows + self.custom_filter_dataflows = config.dataflowsCustomFilter + + def check_connection(self): + """ + Check metadata connection to SAS + """ + check_list = [] + if self.enable_datatables: + check_list.append("datasets") + if self.enable_reports: + check_list.append("reports") + if self.enable_dataflows: + check_list.append("dataflows") + + for asset in check_list: + self.list_assets(asset) + + def get_instance(self, instance_id): + endpoint = f"catalog/instances/{instance_id}" + headers = { + "Accept": "application/vnd.sas.metadata.instance.entity.detail+json", + } + response = self.client._request("GET", path=endpoint, headers=headers) + if "error" in response.keys(): + raise APIError(response["error"]) + return response + + def get_information_catalog_link(self, instance_id): + return f"{self.config.serverHost}/SASInformationCatalog/details/~fs~catalog~fs~instances~fs~{instance_id}" + + def list_assets(self, assets): + """ + Get all assets based on asset types + """ + if assets == "datasets": + enable_asset = self.enable_datatables + asset_filter = self.custom_filter_datatables + elif assets == "reports": + enable_asset = self.enable_reports + asset_filter = self.custom_filter_reports + elif assets == "dataflows": + enable_asset = self.enable_dataflows + asset_filter = self.custom_filter_dataflows + + logger.debug( + f"Configuration for {assets}: enable {assets} - {enable_asset}, " + f"custom {assets} filter - {asset_filter}" + ) + endpoint = ( + f"catalog/search?indices={assets}&q=" + # f"{asset_filter if str(asset_filter) != 'None' else '*'}" + f"{asset_filter if str(asset_filter) != 'None' else '*'}&limit=10" # TODO: MAKE THE CHANGE + ) + headers = {"Accept-Item": "application/vnd.sas.metadata.instance.entity+json"} + response = self.client._request("GET", path=endpoint, headers=headers) + if "error" in response.keys(): + raise APIError(response["error"]) + return response["items"] + + def get_views(self, query): + endpoint = "catalog/instances" + headers = { + "Content-type": "application/vnd.sas.metadata.instance.query+json", + "Accept": "application/json", + } + logger.info(f"{query}") + response = self.client._request( + "POST", path=endpoint, data=query, headers=headers + ) + if "error" in response.keys(): + raise APIError(f"{response}") + return response + + def get_data_source(self, endpoint): + headers = { + "Accept-Item": "application/vnd.sas.data.source+json", + } + response = self.client._request("GET", path=endpoint, headers=headers) + logger.info(f"{response}") + if "error" in response.keys(): + raise APIError(response["error"]) + return response + + def get_report_link(self, resource, uri): + revised_uri = uri.replace("/", "%2F") + endpoint = f"/links/resources/{resource}?uri={revised_uri}" + return self.config.serverHost + endpoint + + def load_table(self, endpoint): + self.client.put(path=endpoint, data={}) + + def get_report_relationship(self, report_id): + endpoint = f"reports/commons/relationships/reports/{report_id}" + response = self.client.get(endpoint) + if "error" in response.keys(): + raise APIError(response["error"]) + dependencies = [] + for item in response["items"]: + if item["type"] == "Dependent": + dependencies.append(item) + return dependencies + + def get_resource(self, endpoint): + response = self.client.get(endpoint) + if "error" in response.keys(): + raise APIError(response["error"]) + return response + + def get_instances_with_param(self, data): + endpoint = f"catalog/instances?{data}" + response = self.client.get(endpoint) + if "error" in response.keys(): + raise APIError(response["error"]) + return response["items"] + + def get_auth_token(self): + return self.auth_token, 0 + + +def get_token(base_url, user, password): + endpoint = "/SASLogon/oauth/token" + payload = {"grant_type": "password", "username": user, "password": password} + headers = { + "Content-type": "application/x-www-form-urlencoded", + "Authorization": "Basic c2FzLmNsaTo=", + } + url = base_url + endpoint + response = requests.request( + "POST", url, headers=headers, data=payload, verify=False, timeout=10 + ) + text_response = response.json() + logger.info(f"this is user: {user}, password: {password}, text: {text_response}") + return response.json()["access_token"] diff --git a/ingestion/src/metadata/ingestion/source/database/sas/connection.py b/ingestion/src/metadata/ingestion/source/database/sas/connection.py new file mode 100644 index 000000000000..a11bd1204cd1 --- /dev/null +++ b/ingestion/src/metadata/ingestion/source/database/sas/connection.py @@ -0,0 +1,47 @@ +# Copyright 2021 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Source connection handler +""" +from typing import Optional + +from metadata.generated.schema.entity.automations.workflow import ( + Workflow as AutomationWorkflow, +) +from metadata.generated.schema.entity.services.connections.database.sasConnection import ( + SASConnection, +) +from metadata.ingestion.connections.test_connections import test_connection_steps +from metadata.ingestion.ometa.ometa_api import OpenMetadata +from metadata.ingestion.source.database.sas.client import SASClient +from metadata.utils.logger import ingestion_logger + +logger = ingestion_logger() + + +def get_connection(connection: SASConnection) -> SASClient: + return SASClient(connection) + + +def test_connection( + metadata: OpenMetadata, + client: SASClient, + service_connection: SASConnection, + automation_workflow: Optional[AutomationWorkflow] = None, +) -> None: + test_fn = {"CheckAccess": client.check_connection} + test_connection_steps( + metadata=metadata, + test_fn=test_fn, + service_type=service_connection.type.value, + automation_workflow=automation_workflow, + ) diff --git a/ingestion/src/metadata/ingestion/source/database/sas/extension_attr.py b/ingestion/src/metadata/ingestion/source/database/sas/extension_attr.py new file mode 100644 index 000000000000..c23fa6e1927b --- /dev/null +++ b/ingestion/src/metadata/ingestion/source/database/sas/extension_attr.py @@ -0,0 +1,103 @@ +# Copyright 2021 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Extension attributes +""" + +TABLE_CUSTOM_ATTR = [ + # Dataset attributes + { + "name": "analysisTimeStamp", + "description": "The timestamp indicating when this object was last analyzed.", + "propertyType": {"id": "STRING_TYPE", "type": "type"}, + }, + { + "name": "creator", + "description": "The creator/author of this object.", + "propertyType": {"id": "STRING_TYPE", "type": "type"}, + }, + { + "name": "editor", + "description": "Specifies the Person who edited the object.", + "propertyType": {"id": "STRING_TYPE", "type": "type"}, + }, + { + "name": "rowCount", + "description": "Number of rows in the data set.", + "propertyType": {"id": "INT_TYPE", "type": "type"}, + }, + { + "name": "columnCount", + "description": "Number of columns in the data set.", + "propertyType": {"id": "INT_TYPE", "type": "type"}, + }, + { + "name": "dataSize", + "description": "Size of the data set in bytes.", + "propertyType": {"id": "INT_TYPE", "type": "type"}, + }, + { + "name": "completenessPercent", + "description": "The percentage of completeness for this data set.", + "propertyType": {"id": "INT_TYPE", "type": "type"}, + }, + { + "name": "dateCreated", + "description": "The date on which the object was created.", + "propertyType": {"id": "STRING_TYPE", "type": "type"}, + }, + { + "name": "dateModified", + "description": "The date on which the object was most recently modified.", + "propertyType": {"id": "STRING_TYPE", "type": "type"}, + }, + { + "name": "source", + "description": "The context from which the referenced resource was obtained.", + "propertyType": { + "id": "STRING_TYPE", + "type": "type", + }, + }, + # SAS Table attributes + { + "name": "CASLIB", + "description": "The name of the CAS library for this table.", + "propertyType": {"id": "STRING_TYPE", "type": "type"}, + }, + { + "name": "casHost", + "description": "The CAS host for the library for this table.", + "propertyType": {"id": "STRING_TYPE", "type": "type"}, + }, + { + "name": "engineName", + "description": "The name of the SAS data access engine used to connect to data.", + "propertyType": {"id": "STRING_TYPE", "type": "type"}, + }, + { + "name": "casPort", + "description": "The CAS port for the library for this table.", + "propertyType": {"id": "INT_TYPE", "type": "type"}, + }, + # CAS Table attributes + { + "name": "sourceName", + "description": "Name of the file source for this data set.", + "propertyType": {"id": "STRING_TYPE", "type": "type"}, + }, + { + "name": "sourceCaslib", + "description": "Name of the caslib source for this data set.", + "propertyType": {"id": "STRING_TYPE", "type": "type"}, + }, +] diff --git a/ingestion/src/metadata/ingestion/source/database/sas/metadata.py b/ingestion/src/metadata/ingestion/source/database/sas/metadata.py new file mode 100644 index 000000000000..cda4f6b75c54 --- /dev/null +++ b/ingestion/src/metadata/ingestion/source/database/sas/metadata.py @@ -0,0 +1,908 @@ +# Copyright 2021 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +SAS source to extract metadata +""" + +# pylint: disable=protected-access,too-many-branches,too-many-locals +import copy +import json +import re +import time +import traceback +from typing import Any, Iterable, Optional, Tuple, Union + +from requests.exceptions import HTTPError + +from metadata.generated.schema.api.data.createDashboard import CreateDashboardRequest +from metadata.generated.schema.api.data.createDatabase import CreateDatabaseRequest +from metadata.generated.schema.api.data.createDatabaseSchema import ( + CreateDatabaseSchemaRequest, +) +from metadata.generated.schema.api.data.createQuery import CreateQueryRequest +from metadata.generated.schema.api.data.createStoredProcedure import ( + CreateStoredProcedureRequest, +) +from metadata.generated.schema.api.data.createTable import CreateTableRequest +from metadata.generated.schema.api.data.createTableProfile import ( + CreateTableProfileRequest, +) +from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest +from metadata.generated.schema.api.services.createDashboardService import ( + CreateDashboardServiceRequest, +) +from metadata.generated.schema.entity.data.dashboard import Dashboard +from metadata.generated.schema.entity.data.database import Database +from metadata.generated.schema.entity.data.table import ( + Column, + ColumnProfile, + Table, + TableProfile, +) +from metadata.generated.schema.entity.services.connections.dashboard.customDashboardConnection import ( + CustomDashboardConnection, + CustomDashboardType, +) +from metadata.generated.schema.entity.services.connections.database.sasConnection import ( + SASConnection, +) +from metadata.generated.schema.entity.services.dashboardService import ( + DashboardConnection, + DashboardServiceType, +) +from metadata.generated.schema.metadataIngestion.databaseServiceMetadataPipeline import ( + DatabaseServiceMetadataPipeline, +) +from metadata.generated.schema.metadataIngestion.workflow import ( + Source as WorkflowSource, +) +from metadata.generated.schema.type.entityLineage import EntitiesEdge +from metadata.generated.schema.type.entityReference import EntityReference +from metadata.ingestion.api.common import Entity +from metadata.ingestion.api.models import Either, StackTraceError +from metadata.ingestion.api.steps import InvalidSourceException +from metadata.ingestion.models.ometa_classification import OMetaTagAndClassification +from metadata.ingestion.ometa.ometa_api import OpenMetadata +from metadata.ingestion.source.connections import get_connection +from metadata.ingestion.source.database.column_type_parser import ColumnTypeParser +from metadata.ingestion.source.database.database_service import DatabaseServiceSource +from metadata.ingestion.source.database.sas.client import SASClient +from metadata.ingestion.source.database.sas.extension_attr import TABLE_CUSTOM_ATTR +from metadata.utils import fqn +from metadata.utils.logger import ingestion_logger + +logger = ingestion_logger() + + +class SasSource( + DatabaseServiceSource +): # pylint: disable=too-many-instance-attributes,too-many-public-methods + """ + Implements the necessary methods to extract + Database metadata from SAS Database Source + """ + + config: WorkflowSource + sas_client: SASClient + + def __init__(self, config: WorkflowSource, metadata: OpenMetadata): + super().__init__() + self.config = config + self.metadata = metadata + self.source_config: DatabaseServiceMetadataPipeline = ( + self.config.sourceConfig.config + ) + self.service_connection = self.config.serviceConnection.__root__.config + + self.sas_client = get_connection(self.service_connection) + self.connection_obj = self.sas_client + self.test_connection() + + self.db_service_name = self.config.serviceName + self.db_name = None + self.db_schema_name = None + self.table_fqns = [] + + self.dashboard_service_name = None + self.chart_names = None + self.report_description = None + + self.add_table_custom_attributes() + + self.databases = None + self.database_schemas = None + + @classmethod + def create(cls, config_dict: dict, metadata: OpenMetadata): + logger.info(f"running create {config_dict}") + config: WorkflowSource = WorkflowSource.parse_obj(config_dict) + connection: SASConnection = config.serviceConnection.__root__.config + if not isinstance(connection, SASConnection): + raise InvalidSourceException( + f"Expected SASConnection, but got {connection}" + ) + return cls(config, metadata) + + def _iter(self) -> Iterable[Either[Entity]]: + # create tables from sas dataSets + if self.sas_client.enable_datatables: + for table in self.sas_client.list_assets("datasets"): + yield from self.create_table_entity(table) + + if self.sas_client.enable_reports: + yield from self.create_dashboard_service("SAS_reports") + for report in self.sas_client.list_assets("reports"): + yield from self.process_report(report) + + if self.sas_client.enable_dataflows: + yield from self.create_dashboard_service("SAS_dataFlows") + for data_flow in self.sas_client.list_assets("dataflows"): + yield from self.process_dataflow(data_flow) + + def process_report(self, report): + self.table_fqns = [] + logger.info(f"Ingesting report: {report}") + report_instance = self.sas_client.get_instance(report["id"]) + for table in self.get_report_tables( + report_instance["resourceId"].split("/")[-1] + ): + yield from self.create_table_entity(table) + yield from self.create_report_entity(report_instance) + + def process_dataflow(self, data_flow): + """ + Process dataflow assets + """ + self.table_fqns = [] + logger.info(f"Ingesting dataflow: {data_flow}") + data_flow_instance = self.sas_client.get_instance(data_flow["id"]) + if not data_flow_instance.get("relationships"): + logger.warning(f"No relationships are found for {data_flow['name']}") + return + input_asset_ids = [ + rel["endpointId"] + for rel in data_flow_instance["relationships"] + if rel["definitionId"] == "6179884b-91ec-4236-ad6b-52c7f454f217" + ] + output_asset_ids = [ + rel["endpointId"] + for rel in data_flow_instance["relationships"] + if rel["definitionId"] == "e1349270-fdbb-4231-9841-79917a307471" + ] + for input_asset in (self.sas_client.get_instance(id) for id in input_asset_ids): + yield from self.create_table_entity(input_asset) + input_fqns = copy.deepcopy(self.table_fqns) + self.table_fqns = [] + for output_asset in ( + self.sas_client.get_instance(id) for id in output_asset_ids + ): + yield from self.create_table_entity(output_asset) + yield from self.create_data_flow_entity( + data_flow_instance, input_fqns, copy.deepcopy(self.table_fqns) + ) + + def create_database_alt(self, db): + """ + Find the name of the mock DB service + Use the link to the parent of the resourceId of the datastore itself, and use its name + Then the db service name will be the provider id + """ + data_store_endpoint = db["resourceId"][1:] + logger.info(f"{data_store_endpoint}") + data_store_resource = self.sas_client.get_data_source(data_store_endpoint) + + data_store_parent_endpoint = "" + for link in data_store_resource["links"]: + if link["rel"] == "parent": + data_store_parent_endpoint = link["uri"][1:] + break + + data_store_parent = self.sas_client.get_data_source(data_store_parent_endpoint) + self.db_name = data_store_parent["id"] + database = CreateDatabaseRequest( + name=data_store_parent["id"], + displayName=data_store_parent["name"], + service=self.db_service_name, + ) + database_entity = self.metadata.create_or_update(data=database) + return database_entity + + def create_database_schema(self, table): + """ + create database schema + """ + try: + context = table["resourceId"].split("/")[3] + + provider = context.split("~")[0] + self.db_name = provider + "." + context.split("~")[2] + self.db_schema_name = context.split("~")[4] + + database = CreateDatabaseRequest( + name=self.db_name, + displayName=self.db_name, + service=self.config.serviceName, + ) + database = self.metadata.create_or_update(data=database) + + db_schema = CreateDatabaseSchemaRequest( + name=self.db_schema_name, database=database.fullyQualifiedName + ) + db_schema_entity = self.metadata.create_or_update(db_schema) + return db_schema_entity + + except HTTPError as _: + # Find the "database" entity in Information Catalog + # First see if the table is a member of the library through the relationships attribute + # Or we could use views to query the dataStores + data_store_data_sets = "4b114f6e-1c2a-4060-9184-6809a612f27b" + data_store_id = None + for relation in table["relationships"]: + if relation["definitionId"] != data_store_data_sets: + continue + data_store_id = relation["endpointId"] + break + + if data_store_id is None: + # log error due to exclude amount of work with tables in dataTables + logger.error("Data store id should not be none") + return None + + data_store = self.sas_client.get_instance(data_store_id) + database = self.create_database_alt(data_store) + self.db_schema_name = data_store["name"] + db_schema = CreateDatabaseSchemaRequest( + name=data_store["name"], database=database.fullyQualifiedName + ) + db_schema_entity = self.metadata.create_or_update(db_schema) + return db_schema_entity + + def create_columns_alt(self, table): + """ + Create columns by loading the table when they are not already loaded + """ + columns_endpoint = "" + load_endpoint = "" + for link in table["links"]: + if link["rel"] == "columns": + columns_endpoint = link["uri"][1:] + "?limit=1000" + if link["rel"] == "load": + load_endpoint = link["uri"][1:] + if load_endpoint: + self.sas_client.load_table(load_endpoint) + columns_resource = self.sas_client.get_resource(columns_endpoint) + columns = [] + for item in columns_resource["items"]: + datatype = item["type"] + if datatype == "num": + datatype = "numeric" + parsed_string = ColumnTypeParser._parse_datatype_string(datatype) + col_name = item["name"] + parsed_string["name"] = col_name.replace('"', "'") + parsed_string["ordinalPosition"] = item["index"] + if datatype.lower() in ["char", "varchar", "binary", "varbinary"]: + parsed_string["dataLength"] = 0 + col = Column(**parsed_string) + columns.append(col) + return columns + + def get_entities_using_view(self, table_id): + """ + Get all the col_entity_instances related to table using views + """ + views_query = { + "query": "match (t:dataSet)-[r:dataSetDataFields]->(c:dataField) return t,r,c", + "parameters": {"t": {"id": f"{table_id}"}}, + } + views_data = json.dumps(views_query) + views = self.sas_client.get_views(views_data) + if not views.get("entities"): # if the resource is not a table + return None, None + + col_entity_instances = views["entities"] + # find datatables in col_entity_instances + table_entity_instance = list( + filter(lambda x: "Table" in x["type"], col_entity_instances) + ) + if len(table_entity_instance) == 1: + table_entity_instance = table_entity_instance[0] + + return col_entity_instances, table_entity_instance + + def get_table_fqn(self, table_name): + return fqn.build( + self.metadata, + entity_type=Table, + service_name=self.db_service_name, + database_name=self.db_name, + schema_name=self.db_schema_name, + table_name=table_name, + ) + + def create_columns_and_profiles(self, entities, table_entity_instance): + """ + Create columns and profiles + """ + columns = [] + col_profile_list = [] + for entity in entities: + if entity["id"] == table_entity_instance["id"]: + continue + if "Column" not in entity["type"]: + continue + col_attributes = entity["attributes"] + if "casDataType" in col_attributes: + datatype = col_attributes["casDataType"] + else: + datatype = col_attributes["dataType"] + if datatype == "num": + datatype = "numeric" + parsed_string = ColumnTypeParser._parse_datatype_string(datatype) + col_name = entity["name"] + parsed_string["name"] = col_name.replace('"', "'") + parsed_string["ordinalPosition"] = col_attributes["ordinalPosition"] + # Column profile to be added + attr_map = { + "mean": "mean", + "median": "sum", + "min": "min", + "max": "max", + "standardDeviation": "stddev", + "missingCount": "nullCount", + "completenessPercent": "valuesPercentage", + "uniquenessPercent": "uniqueProportion", + "cardinalityCount": "distinctCount", + "skewness": "nonParametricSkew", + "quantiles25": "firstQuartile", + "quantiles50": "median", + "quantiles75": "thirdQuartile", + "mismatchedCount": "missingCount", + "charsMinCount": "minLength", + "charsMaxCount": "maxLength", + } + col_profile_dict = {} + for attr, mapped_attr in attr_map.items(): + if attr in col_attributes: + if attr == "uniquenessPercent": + col_profile_dict[mapped_attr] = col_attributes[attr] / 100 + else: + col_profile_dict[mapped_attr] = col_attributes[attr] + if "rowCount" in table_entity_instance["attributes"]: + col_profile_dict["valuesCount"] = table_entity_instance["attributes"][ + "rowCount" + ] + if "valuesCount" in col_profile_dict: + if "distinctCount" in col_profile_dict: + col_profile_dict["distinctProportion"] = ( + col_profile_dict["distinctCount"] + / col_profile_dict["valuesCount"] + ) + col_profile_dict["uniqueCount"] = col_profile_dict["distinctCount"] + if "nullCount" in col_profile_dict: + col_profile_dict["nullProportion"] = ( + col_profile_dict["nullCount"] / col_profile_dict["valuesCount"] + ) + if "missingCount" in col_profile_dict: + col_profile_dict["missingPercentage"] = ( + col_profile_dict["missingCount"] + / col_profile_dict["valuesCount"] + ) + col_profile_dict["validCount"] = ( + col_profile_dict["valuesCount"] + - col_profile_dict["missingCount"] + ) + timestamp = time.time() - 100000 + col_profile_dict["timestamp"] = timestamp + col_profile_dict["name"] = parsed_string["name"] + column_profile = ColumnProfile(**col_profile_dict) + col_profile_list.append(column_profile) + parsed_string["profile"] = column_profile + if datatype.lower() in ["char", "varchar", "binary", "varbinary"]: + if "charsMaxCount" in col_attributes: + parsed_string["dataLength"] = col_attributes["charsMaxCount"] + else: + parsed_string["dataLength"] = 0 + logger.info(f"This is parsed string: {parsed_string}") + col = Column(**parsed_string) + columns.append(col) + return columns, col_profile_list + + def create_table_entity(self, table) -> Iterable[Either[CreateTableRequest]]: + # pylint: disable=global-variable-undefined + """ + Create database + db service & Create database schema + """ + logger.info(f"Ingesting table: {table}") + global table_entity + global table_fqn + + table_entity, table_fqn = None, None + + try: + table_url = self.sas_client.get_information_catalog_link(table["id"]) + col_entity_instances, table_entity_instance = self.get_entities_using_view( + table["id"] + ) + logger.info(f"table entity: {table_entity_instance}") + + if not table_entity_instance: + return + + table_name = table_entity_instance["name"] + table_extension = table_entity_instance["attributes"] + + # create tables in database + database_schema = self.create_database_schema(table_entity_instance) + + # find the table entity to see if it already exists + table_fqn = self.get_table_fqn(table_name) + table_entity = self.metadata.get_by_name( + entity=Table, fqn=table_fqn, fields=["extension"] + ) + + logger.debug(table_entity) + + # if the table entity already exists, we don't need to create it again + # only update it when either the sourceUrl or analysisTimeStamp changed + if not table_entity or ( + table_url != table_entity.sourceUrl.__root__ + or table_entity.extension.__root__.get("analysisTimeStamp") + != table_extension.get("analysisTimeStamp") + ): + + # create the columns of the table + columns, col_profile_list = self.create_columns_and_profiles( + col_entity_instances, table_entity_instance + ) + + # set description based on col counts + if len(columns) == 0: + table_description = ( + "Table has not been analyzed. " + f'Head over to ' + f"SAS Information Catalog to analyze the table." + ) + try: + # Create columns alternatively + table_resource = self.sas_client.get_resource( + table_entity_instance["resourceId"][1:] + ) + columns = self.create_columns_alt(table_resource) + except HTTPError as http_err: + table_description = f"{str(http_err)} This table does not exist in the file path" + else: + table_description = ( + f"Last analyzed: {table_extension.get('analysisTimeStamp')}. " + f'Visit SAS Information Catalog' + f" for more information." + ) + + # build table extension attr + for attr in table_extension: + if isinstance(table_extension[attr], bool): + table_extension[attr] = str(table_extension[attr]) + + custom_attributes = [ + custom_attribute["name"] for custom_attribute in TABLE_CUSTOM_ATTR + ] + extension_attributes = { + attr: value + for attr, value in table_extension.items() + if attr in custom_attributes + } + + # create table request + logger.info( + f"schema: {table['id']}, {self.db_service_name}, {self.db_name}, {self.db_schema_name}" + ) + table_request = CreateTableRequest( + name=table_name, + sourceUrl=table_url, + description=table_description, + columns=columns, + databaseSchema=database_schema.fullyQualifiedName, + extension=extension_attributes, + ) + + yield Either(right=table_request) + + # find the table entity to see if it already exists + yield from self.create_lineage_table_source(table_extension, table_name) + + table_entity = self.metadata.get_by_name( + entity=Table, fqn=self.get_table_fqn(table_name) + ) + # update the description + logger.debug( + f"Updating description for {table_entity.id.__root__} with {table_description}" + ) + self.metadata.client.patch( + path=f"/tables/{table_entity.id.__root__}", + data=json.dumps( + [ + { + "op": "add", + "path": "/description", + "value": table_description, + } + ] + ), + ) + + # update the custom properties + logger.debug( + f"Updating custom properties for {table_entity.id.__root__} with {extension_attributes}" + ) + self.metadata.client.patch( + path=f"/tables/{table_entity.id.__root__}", + data=json.dumps( + [ + { + "op": "add", + "path": "/extension", + "value": extension_attributes, + } + ] + ), + ) + + # quit updating table profile if table doesn't exist + if ( + table_description + and "This table does not exist in the file path" + in table_description + ): + return + + # update table profile + table_profile_dict = { + "timestamp": time.time() - 100000, + "createDateTime": table_entity_instance["creationTimeStamp"], + "rowCount": ( + 0 + if "rowCount" not in table_extension + else table_extension["rowCount"] + ), + "columnCount": ( + 0 + if "columnCount" not in table_extension + else table_extension["columnCount"] + ), + "sizeInByte": ( + 0 + if "dataSize" not in extension_attributes + else table_extension["dataSize"] + ), + } + + # create Profiles & Data Quality Column + table_profile_request = CreateTableProfileRequest( + tableProfile=TableProfile(**table_profile_dict), + columnProfile=col_profile_list, + ) + self.metadata.client.put( + path=f"{self.metadata.get_suffix(Table)}/{table_entity.id.__root__}/tableProfile", + data=table_profile_request.json(), + ) + + except Exception as exc: + logger.error(f"table failed to create: {table}") + yield Either( + left=StackTraceError( + name=table_name, + error=f"Unexpected exception to create table [{table_name}]: {exc}", + stack_trace=traceback.format_exc(), + ) + ) + finally: + if table_entity: + self.table_fqns.append(table_fqn) + + def create_lineage_table_source(self, table_extension, table_name): + """ + create lineage between the table and its source + """ + if "sourceName" in table_extension and table_extension["sourceName"] != "": + source_name = table_extension["sourceName"] + # see if the source table already exists + source_table_fqn = self.get_table_fqn(source_name) + logger.debug(f"source_table_fqn for sourceTable is {source_table_fqn}") + source_table_entity = self.metadata.get_by_name( + entity=Table, fqn=source_table_fqn + ) + target_table_entity = self.metadata.get_by_name( + entity=Table, fqn=self.get_table_fqn(table_name) + ) + + # process to create lineage if source table doesn't exist + if not source_table_entity: + sanitized_source_name = re.sub("[@!#$%^&*]", "", source_name) + param = f"filter=contains(name, '{sanitized_source_name}')" + get_instances_with_param = self.sas_client.get_instances_with_param( + param + ) + if get_instances_with_param and len(get_instances_with_param) == 1: + source_table = get_instances_with_param[0] + yield from self.create_table_entity(source_table) + + source_table_entity = self.metadata.get_by_name( + entity=Table, fqn=source_table_fqn + ) + + if source_table_entity: + yield from self.create_table_lineage( + source_table_entity, target_table_entity + ) + + def add_table_custom_attributes(self): + """ + Adding custom attribute from extension_attr.py + """ + string_type = self.metadata.client.get(path="/metadata/types/name/string")["id"] + integer_type = self.metadata.client.get(path="/metadata/types/name/integer")[ + "id" + ] + for attr in TABLE_CUSTOM_ATTR: + if attr["propertyType"]["id"] == "STRING_TYPE": + attr["propertyType"]["id"] = string_type + else: + attr["propertyType"]["id"] = integer_type + table_type = self.metadata.client.get(path="/metadata/types/name/table") + table_id = table_type["id"] + for attr in TABLE_CUSTOM_ATTR: + self.metadata.client.put( + path=f"/metadata/types/{table_id}", data=json.dumps(attr) + ) + + def create_table_lineage(self, from_entity, to_entity): + yield self.create_lineage_request("table", "table", from_entity, to_entity) + + def create_dashboard_service(self, dashboard_service_name): + self.dashboard_service_name = dashboard_service_name + + try: + dashboard_service_request = CreateDashboardServiceRequest( + name=dashboard_service_name, + serviceType=DashboardServiceType.CustomDashboard, + connection=DashboardConnection( + config=CustomDashboardConnection( + type=CustomDashboardType.CustomDashboard, + sourcePythonClass="metadata.ingestion.source.database.customdatabase.metadata.SASDB", + ) + ), + ) + yield Either(right=dashboard_service_request) + except Exception as exc: + yield Either( + left=StackTraceError( + name=dashboard_service_name, + error=f"Unexpected exception to create dashboard service for [{dashboard_service_name}]: {exc}", + stack_trace=traceback.format_exc(), + ) + ) + + def get_report_tables(self, report_id): + """ + Get datasets related to the report + """ + report_tables = self.sas_client.get_report_relationship(report_id) + table_instances = [] + self.report_description = [] + # loop through each relatedResourceUri from relationships + for table in report_tables: + table_uri = table["relatedResourceUri"][1:] + try: + # load the table if it can be found + table_resource = self.sas_client.get_resource(table_uri) + table_data_resource = table_resource["tableReference"]["tableUri"] + param = f"filter=eq(resourceId,'{table_data_resource}')" + if "state" in table_resource and table_resource["state"] == "unloaded": + self.sas_client.load_table(table_uri + "/state?value=loaded") + + except HTTPError as e: + # append http error to table description if it can't be found + logger.error(f"table_uri: {table_uri}") + self.report_description.append(str(e)) + name_index = table_uri.rindex("/") + table_name = table_uri[name_index + 1 :] + param = f"filter=eq(name,'{table_name}')" + + get_instances_with_param = self.sas_client.get_instances_with_param(param) + if get_instances_with_param and len(get_instances_with_param) == 1: + table_instance = get_instances_with_param[0] + table_instances.append(table_instance) + return table_instances + + def create_lineage_request(self, from_type, in_type, from_entity, to_entity): + return Either( + right=AddLineageRequest( + edge=EntitiesEdge( + fromEntity=EntityReference( + id=from_entity.id.__root__, type=from_type + ), + toEntity=EntityReference(id=to_entity.id.__root__, type=in_type), + ) + ) + ) + + def create_report_entity(self, report): + """ + Create report entity and its corresponding lineage to the datasets + """ + report_id = report["id"] + report_name = report["name"] + try: + report_resource = report["resourceId"] + report_url = self.sas_client.get_report_link("report", report_resource) + self.report_description = ( + str(self.report_description) if self.report_description else None + ) + report_request = CreateDashboardRequest( + name=report_id, + displayName=report_name, + sourceUrl=report_url, + charts=self.chart_names, + service=self.dashboard_service_name, + description=self.report_description, + ) + yield Either(right=report_request) + + dashboard_fqn = fqn.build( + self.metadata, + entity_type=Dashboard, + service_name=self.dashboard_service_name, + dashboard_name=report_id, + ) + + dashboard_entity = self.metadata.get_by_name( + entity=Dashboard, fqn=dashboard_fqn + ) + table_entities = [] + for table in self.table_fqns: + entity_instance = self.metadata.get_by_name(entity=Table, fqn=table) + table_entities.append(entity_instance) + for entity in table_entities: + yield self.create_lineage_request( + "table", "dashboard", entity, dashboard_entity + ) + except Exception as exc: + logger.error(f"report failed to create: {report}") + yield Either( + left=StackTraceError( + name=report_name, + error=f"Unexpected exception to create report [{report['id']}]: {exc}", + stack_trace=traceback.format_exc(), + ) + ) + + def create_data_flow_entity(self, data_flow, input_fqns, output_fqns): + """ + Create data flow and its corresponding lineage with the input & output table + """ + data_flow_id = data_flow["id"] + data_flow_resource = data_flow["resourceId"] + + try: + data_flow_url = self.sas_client.get_report_link( + "dataFlow", data_flow_resource + ) + data_flow_request = CreateDashboardRequest( + name=data_flow_id, + displayName=data_flow["name"], + service=self.dashboard_service_name, + sourceUrl=data_flow_url, + ) + yield Either(right=data_flow_request) + + dashboard_fqn = fqn.build( + self.metadata, + entity_type=Dashboard, + service_name=self.dashboard_service_name, + dashboard_name=data_flow_id, + ) + + dashboard_entity = self.metadata.get_by_name( + entity=Dashboard, fqn=dashboard_fqn + ) + + input_entities = [ + self.metadata.get_by_name(entity=Table, fqn=input_entity) + for input_entity in input_fqns + ] + output_entities = [ + self.metadata.get_by_name(entity=Table, fqn=output_entity) + for output_entity in output_fqns + ] + + for entity in input_entities: + yield self.create_lineage_request( + "table", "dashboard", entity, dashboard_entity + ) + for entity in output_entities: + yield self.create_lineage_request( + "dashboard", "table", dashboard_entity, entity + ) + except Exception as exc: + logger.error(f"dataflow failed to create: {data_flow}") + yield Either( + left=StackTraceError( + name=data_flow_id, + error=f"Unexpected exception to create data flow [{data_flow_id}]: {exc}", + stack_trace=traceback.format_exc(), + ) + ) + + def get_database_names(self) -> Iterable[str]: + for database in self.databases: + yield database + + def yield_database( + self, database_name: str + ) -> Iterable[Either[CreateDatabaseRequest]]: + yield Either( + right=CreateDatabaseRequest( + name=database_name, + service=self.context.database_service, + ) + ) + + def get_database_schema_names(self) -> Iterable[Tuple[str, str]]: + for database, database_schemas in self.database_schemas.items(): + for database_schema in database_schemas: + yield database, database_schema + + def yield_database_schema( + self, schema_name: Tuple[str, str] + ) -> Iterable[Either[CreateDatabaseSchemaRequest]]: + yield Either( + right=CreateDatabaseSchemaRequest( + name=schema_name[1], + database=fqn.build( + metadata=self.metadata, + entity_type=Database, + service_name=self.context.database_service, + database_name=schema_name[0], + ), + ) + ) + + def yield_tag( + self, schema_name: str + ) -> Iterable[Either[OMetaTagAndClassification]]: + """No tags to send""" + + def yield_view_lineage(self) -> Iterable[Either[AddLineageRequest]]: + yield from [] + + def get_tables_name_and_type(self) -> Optional[Iterable[Tuple[str, list]]]: + """Not implemented""" + + def yield_table( + self, table_name_and_type: Tuple[str, list] + ) -> Iterable[Either[Entity]]: + """Not implemented""" + + def get_stored_procedures(self) -> Iterable[Any]: + """Not implemented""" + + def yield_stored_procedure( + self, stored_procedure: Any + ) -> Iterable[Either[CreateStoredProcedureRequest]]: + """Not implemented""" + + def yield_procedure_lineage_and_queries( + self, + ) -> Iterable[Either[Union[AddLineageRequest, CreateQueryRequest]]]: + yield from [] + + def close(self) -> None: + pass diff --git a/openmetadata-docs/content/v1.1.x/how-to-guides/quick-start-guide-for-admins/how-to-ingest-metadata.md b/openmetadata-docs/content/v1.1.x/how-to-guides/quick-start-guide-for-admins/how-to-ingest-metadata.md index db250d4b896a..eb959ddcf5a2 100644 --- a/openmetadata-docs/content/v1.1.x/how-to-guides/quick-start-guide-for-admins/how-to-ingest-metadata.md +++ b/openmetadata-docs/content/v1.1.x/how-to-guides/quick-start-guide-for-admins/how-to-ingest-metadata.md @@ -27,7 +27,7 @@ href="/connectors"%} Refer to the Docs to ingest metadata from multiple sources - Databases, Dashboards, Pipelines, ML Models, Messaging, Storage, as well as Metadata services. {%/inlineCallout%} -- **Database Services:** [Athena](/connectors/database/athena), [AzureSQL](/connectors/database/azuresql), [BigQuery](/connectors/database/bigquery), [Clickhouse](/connectors/database/clickhouse), [Databricks](/connectors/database/databricks), [Datalake](/connectors/database/datalake), [DB2](/connectors/database/db2), [DeltaLake](/connectors/database/deltalake), [Domo Database](/connectors/database/domo-database), [Druid](/connectors/database/druid), [DynamoDB](/connectors/database/dynamodb), [Glue](/connectors/database/glue), [Hive](/connectors/database/hive), [Impala](/connectors/database/impala), [MariaDB](/connectors/database/mariadb), [MongoDB](/connectors/database/mongodb), [MSSQL](/connectors/database/mssql), [MySQL](/connectors/database/mysql), [Oracle](/connectors/database/oracle), [PinotDB](/connectors/database/pinotdb), [Postgres](/connectors/database/postgres), [Presto](/connectors/database/presto), [Redshift](/connectors/database/redshift), [Salesforce](/connectors/database/salesforce), [SAP Hana](/connectors/database/sap-hana), [SingleStore](/connectors/database/singlestore), [Snowflake](/connectors/database/snowflake), [SQLite](/connectors/database/sqlite), [Trino](/connectors/database/trino), and [Vertica](/connectors/database/vertica). +- **Database Services:** [Athena](/connectors/database/athena), [AzureSQL](/connectors/database/azuresql), [BigQuery](/connectors/database/bigquery), [Clickhouse](/connectors/database/clickhouse), [Databricks](/connectors/database/databricks), [Datalake](/connectors/database/datalake), [DB2](/connectors/database/db2), [DeltaLake](/connectors/database/deltalake), [Domo Database](/connectors/database/domo-database), [Druid](/connectors/database/druid), [DynamoDB](/connectors/database/dynamodb), [Glue](/connectors/database/glue), [Hive](/connectors/database/hive), [Impala](/connectors/database/impala), [MariaDB](/connectors/database/mariadb), [MongoDB](/connectors/database/mongodb), [MSSQL](/connectors/database/mssql), [MySQL](/connectors/database/mysql), [Oracle](/connectors/database/oracle), [PinotDB](/connectors/database/pinotdb), [Postgres](/connectors/database/postgres), [Presto](/connectors/database/presto), [Redshift](/connectors/database/redshift), [Salesforce](/connectors/database/salesforce), [SAP Hana](/connectors/database/sap-hana), [SingleStore](/connectors/database/singlestore), [Snowflake](/connectors/database/snowflake), [SQLite](/connectors/database/sqlite), [Trino](/connectors/database/trino), [Vertica](/connectors/database/vertica), and [SAS](/connectors/database/sas). - **Dashboard Services:** [Domo Dashboard](/connectors/dashboard/domo-dashboard), [Looker](/connectors/dashboard/looker), [Metabase](/connectors/dashboard/metabase), [Mode](/connectors/dashboard/mode), [PowerBI](/connectors/dashboard/powerbi), [Qlik Sense](/connectors/dashboard/qliksense), [QuickSight](/connectors/dashboard/quicksight), [Redash](/connectors/dashboard/redash), [Superset](/connectors/dashboard/superset), and [Tableau](/connectors/dashboard/tableau). diff --git a/openmetadata-docs/content/v1.2.x/connectors/database/index.md b/openmetadata-docs/content/v1.2.x/connectors/database/index.md index 7e66479003d3..4bc3372d0af5 100644 --- a/openmetadata-docs/content/v1.2.x/connectors/database/index.md +++ b/openmetadata-docs/content/v1.2.x/connectors/database/index.md @@ -35,6 +35,7 @@ This is the supported list of connectors for Database Services: - [Trino](/connectors/database/trino) - [Unity Catalog](/connectors/database/unity-catalog) - [Vertica](/connectors/database/vertica) +- [SAS](/connectors/database/sas) If you have a request for a new connector, don't hesitate to reach out in [Slack](https://slack.open-metadata.org/) or open a [feature request](https://github.com/open-metadata/OpenMetadata/issues/new/choose) in our GitHub repo. diff --git a/openmetadata-docs/content/v1.2.x/connectors/database/sas/index.md b/openmetadata-docs/content/v1.2.x/connectors/database/sas/index.md new file mode 100644 index 000000000000..39c1ec29ed7e --- /dev/null +++ b/openmetadata-docs/content/v1.2.x/connectors/database/sas/index.md @@ -0,0 +1,46 @@ +--- +title: SAS +slug: /connectors/database/sas +--- + +# SAS + +{% partial file="/v1.2/connectors/ingestion-modes-tiles.md" variables={yamlPath: "/connectors/database/sas/yaml"} /%} + +## Requirements + +## 1. SAS Metadata Ingestion + +Prepare the SAS Service and configure the Ingestion: + +{% partial + file="/v1.2/connectors/metadata-ingestion-ui.md" + variables={ + connector: "SAS", + selectServicePath: "/images/v1.2/connectors/sas/select-service.png", + addNewServicePath: "/images/v1.2/connectors/sas/add-new-service.png", + serviceConnectionPath: "/images/v1.2/connectors/sas/service-connection.png", +} +/%} + +{% stepsContainer %} +{% extraContent parentTagName="stepsContainer" %} + +#### Connection Details + +- **ServerHost**: Host and port of the SAS Viya deployment. +- **Username**: Username to connect to SAS Viya. This user should have privileges to read all the metadata in SAS Information Catalog. +- **Password**: Password to connect to SAS Viya. +- **Filter**: A filter expression specifying items for import. For more information see https://developer.sas.com/apis/rest/DataManagement/#catalog + +{% /extraContent %} + +{% partial file="/v1.2/connectors/test-connection.md" /%} + +{% partial file="/v1.2/connectors/metadata/configure-ingestion.md" /%} + +{% partial file="/v1.2/connectors/ingestion-schedule-and-deploy.md" /%} + +{% /stepsContainer %} + +{% partial file="/v1.2/connectors/troubleshooting.md" /%} diff --git a/openmetadata-docs/content/v1.2.x/connectors/database/sas/yaml.md b/openmetadata-docs/content/v1.2.x/connectors/database/sas/yaml.md new file mode 100644 index 000000000000..e6e798bce830 --- /dev/null +++ b/openmetadata-docs/content/v1.2.x/connectors/database/sas/yaml.md @@ -0,0 +1,118 @@ +--- +title: Run the SAS Connector Externally +slug: /connectors/database/sas/yaml +--- + +# Run the SAS Connector Externally + +In this section, we provide guides and references to use the SAS connector. + +Configure and schedule SAS metadata workflows from the OpenMetadata UI: + +- [Requirements](#requirements) +- [Metadata Ingestion](#metadata-ingestion) + + +{% partial file="/v1.2/connectors/external-ingestion-deployment.md" /%} + +## Requirements + +{%inlineCallout icon="description" bold="OpenMetadata 1.3.0 or later" href="/deployment"%} +To deploy OpenMetadata, check the Deployment guides. +{%/inlineCallout%} + + +### Python Requirements + +To run the SAS ingestion, you will need to install: + +```bash +pip3 install "openmetadata-ingestion[sas]" +``` + +## Metadata Ingestion + +All connectors are defined as JSON Schemas. +[Here](https://github.com/open-metadata/OpenMetadata/blob/main/openmetadata-spec/src/main/resources/json/schema/entity/services/connections/database/sasConnection.json) +you can find the structure to create a connection to SAS. + +In order to create and run a Metadata Ingestion workflow, we will follow +the steps to create a YAML configuration able to connect to the source, +process the Entities if needed, and reach the OpenMetadata server. + +The workflow is modeled around the following +[JSON Schema](https://github.com/open-metadata/OpenMetadata/blob/main/openmetadata-spec/src/main/resources/json/schema/metadataIngestion/workflow.json) + +### 1. Define the YAML Config + +{% codePreview %} + +{% codeInfoContainer %} + +#### Source Configuration - Service Connection + +{% codeInfo srNumber=12 %} + +**serverHost**: Host and port of the SAS Viya deployment. + +{% /codeInfo %} + +{% codeInfo srNumber=13 %} + +**username**: Username to connect to SAS Viya. This user should have privileges to read all the metadata in SAS Information Catalog. + +{% /codeInfo %} + +{% codeInfo srNumber=14 %} + +**password**: Password to connect to SAS Viya. + +{% /codeInfo %} + +#### Sink Configuration + +{% codeInfo srNumber=18 %} + +To send the metadata to OpenMetadata, it needs to be specified as `type: metadata-rest`. + +{% /codeInfo %} + +{% partial file="/v1.2/connectors/yaml/workflow-config-def.md" /%} +{% /codeInfoContainer %} + +{% codeBlock fileName="filename.yaml" %} + +```yaml +source: + type: SAS + serviceName: local_sas + serviceConnection: + config: + type: SAS +``` +```yaml {% srNumber=12 %} + serverHost: http://localhost:10000 +``` +```yaml {% srNumber=13 %} + username: username +``` +```yaml {% srNumber=14 %} + password: password +``` + sourceConfig: + config: + type: DatabaseMetadata +``` +```yaml {% srNumber=18 %} +sink: + type: metadata-rest + config: {} +``` + +{% partial file="/v1.2/connectors/yaml/workflow-config.md" /%} + +{% /codeBlock %} + +{% /codePreview %} + +{% partial file="/v1.2/connectors/yaml/ingestion-cli.md" /%} diff --git a/openmetadata-docs/content/v1.2.x/connectors/index.md b/openmetadata-docs/content/v1.2.x/connectors/index.md index 62271cf12ade..d5e59059d3a6 100644 --- a/openmetadata-docs/content/v1.2.x/connectors/index.md +++ b/openmetadata-docs/content/v1.2.x/connectors/index.md @@ -64,6 +64,7 @@ the following docs to run the Ingestion Framework in any orchestrator externally - [Trino](/connectors/database/trino) - [Unity Catalog](/connectors/database/unity-catalog) - [Vertica](/connectors/database/vertica) +- [SAS](/connectors/database/sas) ## Dashboard Services diff --git a/openmetadata-docs/content/v1.2.x/main-concepts/metadata-standard/schemas/entity/services/connections/database/sasConnection.md b/openmetadata-docs/content/v1.2.x/main-concepts/metadata-standard/schemas/entity/services/connections/database/sasConnection.md new file mode 100644 index 000000000000..1bc67233a1d9 --- /dev/null +++ b/openmetadata-docs/content/v1.2.x/main-concepts/metadata-standard/schemas/entity/services/connections/database/sasConnection.md @@ -0,0 +1,21 @@ +--- +title: sasConnection +slug: /main-concepts/metadata-standard/schemas/entity/services/connections/database/sasconnection +--- + +# SASConnection + +*SAS Connection Config* + +## Properties + +- **`type`**: Service Type. Refer to *#/definitions/sasType*. Default: `SAS`. +- **`username`** *(string)*: Username to connect to SAS Viya. +- **`password`** *(string)*: Password to connect to SAS Viya. +- **`serverHost`** *(string)*: Hostname of SAS Viya deployment. +## Definitions + +- **`sasType`** *(string)*: Service type. Must be one of: `['SAS']`. Default: `SAS`. + + +Documentation file automatically generated at 2023-12-06 13:47:02.454513. diff --git a/openmetadata-docs/images/connectors/sas.png b/openmetadata-docs/images/connectors/sas.png new file mode 100644 index 000000000000..275fd54d10e2 Binary files /dev/null and b/openmetadata-docs/images/connectors/sas.png differ diff --git a/openmetadata-docs/images/v1.2/connectors/sas/add-new-service.png b/openmetadata-docs/images/v1.2/connectors/sas/add-new-service.png new file mode 100644 index 000000000000..c5246e7e6d35 Binary files /dev/null and b/openmetadata-docs/images/v1.2/connectors/sas/add-new-service.png differ diff --git a/openmetadata-docs/images/v1.2/connectors/sas/select-service.png b/openmetadata-docs/images/v1.2/connectors/sas/select-service.png new file mode 100644 index 000000000000..a4cffe85fe48 Binary files /dev/null and b/openmetadata-docs/images/v1.2/connectors/sas/select-service.png differ diff --git a/openmetadata-docs/images/v1.2/connectors/sas/service-connection.png b/openmetadata-docs/images/v1.2/connectors/sas/service-connection.png new file mode 100644 index 000000000000..9dd298191472 Binary files /dev/null and b/openmetadata-docs/images/v1.2/connectors/sas/service-connection.png differ diff --git a/openmetadata-service/src/main/resources/json/data/testConnections/metadata/sas.json b/openmetadata-service/src/main/resources/json/data/testConnections/metadata/sas.json new file mode 100644 index 000000000000..d21fe0b18fb4 --- /dev/null +++ b/openmetadata-service/src/main/resources/json/data/testConnections/metadata/sas.json @@ -0,0 +1,14 @@ +{ + "name": "SAS", + "displayName": "SAS Test Connection", + "description": "This Test Connection validates the access against the server and basic metadata extraction of tables.", + "steps": [ + { + "name": "CheckAccess", + "description": "Check if the catalog APIs are reachable with the given credentials", + "errorMessage": "Failed to connect to SAS Catalog, please validate the credentials", + "shortCircuit": true, + "mandatory": true + } + ] +} \ No newline at end of file diff --git a/openmetadata-spec/src/main/resources/json/schema/entity/services/connections/database/sasConnection.json b/openmetadata-spec/src/main/resources/json/schema/entity/services/connections/database/sasConnection.json new file mode 100644 index 000000000000..78c1501fd731 --- /dev/null +++ b/openmetadata-spec/src/main/resources/json/schema/entity/services/connections/database/sasConnection.json @@ -0,0 +1,102 @@ +{ + "$id": "https://open-metadata.org/schema/entity/services/connections/database/sasConnection.json", + "$schema": "http://json-schema.org/draft-07/schema#", + "title": "SASConnection", + "description": "SAS Connection Config", + "type": "object", + "javaType": "org.openmetadata.schema.services.connections.database.SASConnection", + "definitions": { + "sasType": { + "description": "Service type.", + "type": "string", + "enum": ["SAS"], + "default": "SAS" + } + }, + "properties": { + "type": { + "description": "Service Type", + "$ref": "#/definitions/sasType", + "default": "SAS" + }, + "username": { + "description": "Username to connect to SAS Viya.", + "type": "string" + }, + "password": { + "description": "Password to connect to SAS Viya", + "type": "string", + "format": "password" + }, + "serverHost": { + "description": "Hostname of SAS Viya deployment.", + "type": "string", + "format": "uri" + }, + "datatables": { + "description": "Enable datatables for ingestion", + "type": "boolean", + "default": true + }, + "dataTablesCustomFilter": { + "title": "Custom Filter for datatables", + "description": "Custom filter for datatables", + "oneOf": [ + { + "title": "No Custom Filter", + "description": "Don't include custom filter when ingesting metadata for datatables", + "type": "object" + }, + { + "title": "Custom Filter", + "description": "Include custom filter when ingesting metadata for datatables", + "type": "string" + } + ] + }, + "reports": { + "description": "Enable report for ingestion", + "type": "boolean", + "default": false + }, + "reportsCustomFilter": { + "title": "Custom Filter for reports", + "description": "Custom filter for reports", + "oneOf": [ + { + "title": "No Custom Filter", + "description": "Don't include custom filter when ingesting metadata for reports", + "type": "object" + }, + { + "title": "Custom Filter", + "description": "Include custom filter when ingesting metadata for reports", + "type": "string" + } + ] + }, + "dataflows": { + "description": "Enable dataflow for ingestion", + "type": "boolean", + "default": false + }, + "dataflowsCustomFilter": { + "title": "Custom Filter for dataflows", + "description": "Custom filter for dataflows", + "oneOf": [ + { + "title": "No Custom Filter", + "description": "Don't include custom filter when ingesting metadata for dataflows", + "type": "object" + }, + { + "title": "Custom Filter", + "description": "Include custom filter when ingesting metadata for dataflows", + "type": "string" + } + ] + } + }, + "required": ["username", "password", "serverHost"], + "additionalProperties": false +} \ No newline at end of file diff --git a/openmetadata-spec/src/main/resources/json/schema/entity/services/databaseService.json b/openmetadata-spec/src/main/resources/json/schema/entity/services/databaseService.json index 81e4ecbd026a..10dfd6a93315 100644 --- a/openmetadata-spec/src/main/resources/json/schema/entity/services/databaseService.json +++ b/openmetadata-spec/src/main/resources/json/schema/entity/services/databaseService.json @@ -52,7 +52,8 @@ "Greenplum", "Doris", "UnityCatalog", - "DuckDb" + "DuckDb", + "SAS" ], "javaEnums": [ { @@ -168,8 +169,10 @@ }, { "name": "DuckDb" + }, + { + "name": "SAS" } - ] }, "databaseConnection": { @@ -289,6 +292,9 @@ }, { "$ref": "./connections/database/duckdbConnection.json" + }, + { + "$ref": "./connections/database/sasConnection.json" } ] } diff --git a/openmetadata-ui/src/main/resources/ui/public/locales/en-US/Database/SAS.md b/openmetadata-ui/src/main/resources/ui/public/locales/en-US/Database/SAS.md new file mode 100644 index 000000000000..b2c6981bdbb2 --- /dev/null +++ b/openmetadata-ui/src/main/resources/ui/public/locales/en-US/Database/SAS.md @@ -0,0 +1,34 @@ +# SAS Viya + +In this section, we provide guides and references to use the SAS Viya connector. + +## Requirements +You can find further information on the Kafka connector in the [docs](https://docs.open-metadata.org/connectors/metadata/atlas). + +## Connection Details + +$$section +### Username $(id="username") + +Username to connect to SAS Viya. This user should have privileges to read all the metadata in SAS Information Catalog. +$$ + +$$section +### Password $(id="password") + +Password to connect to SAS Viya. +$$ + +$$section +### ServerHost $(id="serverHost") + +Server host and port of SAS Viya. +$$ + +$$section +### Filter $(id="filter") + +A filter expression specifying items for import. For more information [see](https://developer.sas.com/apis/rest/DataManagement/#catalog-search) + +$$ + diff --git a/openmetadata-ui/src/main/resources/ui/src/assets/img/service-icon-sas.svg b/openmetadata-ui/src/main/resources/ui/src/assets/img/service-icon-sas.svg new file mode 100644 index 000000000000..e967b541136b --- /dev/null +++ b/openmetadata-ui/src/main/resources/ui/src/assets/img/service-icon-sas.svg @@ -0,0 +1,16 @@ + + + + + diff --git a/openmetadata-ui/src/main/resources/ui/src/constants/Services.constant.ts b/openmetadata-ui/src/main/resources/ui/src/constants/Services.constant.ts index 390aa62a35b9..16c3d2494151 100644 --- a/openmetadata-ui/src/main/resources/ui/src/constants/Services.constant.ts +++ b/openmetadata-ui/src/main/resources/ui/src/constants/Services.constant.ts @@ -69,6 +69,7 @@ import redshift from '../assets/img/service-icon-redshift.png'; import sagemaker from '../assets/img/service-icon-sagemaker.png'; import salesforce from '../assets/img/service-icon-salesforce.png'; import sapHana from '../assets/img/service-icon-sap-hana.png'; +import sas from '../assets/img/service-icon-sas.svg'; import scikit from '../assets/img/service-icon-scikit.png'; import singlestore from '../assets/img/service-icon-singlestore.png'; import snowflakes from '../assets/img/service-icon-snowflakes.png'; @@ -164,6 +165,7 @@ export const DAGSTER = dagster; export const FIVETRAN = fivetran; export const AMUNDSEN = amundsen; export const ATLAS = atlas; +export const SAS = sas; export const LOGO = logo; export const AIRFLOW = airflow; diff --git a/openmetadata-ui/src/main/resources/ui/src/utils/DatabaseServiceUtils.ts b/openmetadata-ui/src/main/resources/ui/src/utils/DatabaseServiceUtils.ts index a5bba6c0e46e..249b6e6756b6 100644 --- a/openmetadata-ui/src/main/resources/ui/src/utils/DatabaseServiceUtils.ts +++ b/openmetadata-ui/src/main/resources/ui/src/utils/DatabaseServiceUtils.ts @@ -44,6 +44,7 @@ import prestoConnection from '../jsons/connectionSchemas/connections/database/pr import redshiftConnection from '../jsons/connectionSchemas/connections/database/redshiftConnection.json'; import salesforceConnection from '../jsons/connectionSchemas/connections/database/salesforceConnection.json'; import sapHanaConnection from '../jsons/connectionSchemas/connections/database/sapHanaConnection.json'; +import sasConnection from '../jsons/connectionSchemas/connections/database/sasConnection.json'; import singleStoreConnection from '../jsons/connectionSchemas/connections/database/singleStoreConnection.json'; import snowflakeConnection from '../jsons/connectionSchemas/connections/database/snowflakeConnection.json'; import sqliteConnection from '../jsons/connectionSchemas/connections/database/sqliteConnection.json'; @@ -236,6 +237,11 @@ export const getDatabaseConfig = (type: DatabaseServiceType) => { break; } + case DatabaseServiceType.SAS: { + schema = sasConnection; + + break; + } default: { schema = {}; diff --git a/openmetadata-ui/src/main/resources/ui/src/utils/ServiceUtilClassBase.ts b/openmetadata-ui/src/main/resources/ui/src/utils/ServiceUtilClassBase.ts index 7a06ed092060..e3ccef2cbadc 100644 --- a/openmetadata-ui/src/main/resources/ui/src/utils/ServiceUtilClassBase.ts +++ b/openmetadata-ui/src/main/resources/ui/src/utils/ServiceUtilClassBase.ts @@ -73,6 +73,7 @@ import { SAGEMAKER, SALESFORCE, SAP_HANA, + SAS, SCIKIT, SINGLESTORE, SNOWFLAKE, @@ -245,6 +246,9 @@ class ServiceUtilClassBase { case DatabaseServiceType.MongoDB: return MONGODB; + case DatabaseServiceType.SAS: + return SAS; + case DatabaseServiceType.Couchbase: return COUCHBASE;