Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixes #14413: Add SAS connector #14415

Merged
merged 18 commits into from
Jan 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion docker/run_local_docker.sh
Original file line number Diff line number Diff line change
Expand Up @@ -164,4 +164,3 @@ sleep 60 # Sleep for 60 seconds to make sure the elasticsearch reindexing from U
tput setaf 2
echo "✔ OpenMetadata is up and running"


1 change: 1 addition & 0 deletions ingestion/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,7 @@
"sagemaker": {VERSIONS["boto3"]},
"salesforce": {"simple_salesforce==1.11.4"},
"sap-hana": {"hdbcli", "sqlalchemy-hana"},
"sas": {},
"singlestore": {VERSIONS["pymysql"]},
"sklearn": {VERSIONS["scikit-learn"]},
"snowflake": {VERSIONS["snowflake"]},
Expand Down
28 changes: 28 additions & 0 deletions ingestion/src/metadata/examples/workflows/sas.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
source:
type: sas
serviceName: local_sas
serviceConnection:
config:
type: SAS
username: username
password: password
serverHost: http://your-server-host.org
datatables: True
dataTablesCustomFilter: None
reports: False
reportsCustomFilter: None
dataflows: False
dataflowsCustomFilter: None
sourceConfig:
config:
type: DatabaseMetadata
sink:
type: metadata-rest
config: {}
workflowConfig:
loggerLevel: DEBUG
openMetadataServerConfig:
hostPort: http://localhost:8585/api
authProvider: openmetadata
securityConfig:
jwtToken: eyJraWQiOiJHYjM4OWEtOWY3Ni1nZGpzLWE5MmotMDI0MmJrOTQzNTYiLCJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJvcGVuLW1ldGFkYXRhLm9yZyIsInN1YiI6ImluZ2VzdGlvbi1ib3QiLCJlbWFpbCI6ImluZ2VzdGlvbi1ib3RAb3Blbm1ldGFkYXRhLm9yZyIsImlzQm90Ijp0cnVlLCJ0b2tlblR5cGUiOiJCT1QiLCJpYXQiOjE3MDQ3NDY0MzYsImV4cCI6bnVsbH0.qRWByaRw3F1B0bDqdpmxDsHEUx9Npk5VNelpabuvmVERjIG8AY88p1dv5gBME6Y1-kfTtCAMtSxsll0_gTR35D1foVdXgGMRAPyNXH0JHRpENBnT1V3OVO0yRWmeqsp5K7yqiaVa-CeqxitSgYrns58BFRD_vFX5vxMHirFBrFddH7b8af8823a8Oh-wMDCuJJd7Ya61Kv6gUpssE7Y403PwomLK6pE7gsAgZ5YOyjHlQ889C9z8oLQ268BX3ndmTp6t1J7MYgOEqeIzeCoBQZf6aBdyCqYajB3TTwb3SHFz2TEYF5xeLvYlZ77ek3l22m6Ehh5d4t2jB-ZOFDUkqA
Empty file.
184 changes: 184 additions & 0 deletions ingestion/src/metadata/ingestion/source/database/sas/client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Client to interact with SAS Viya apis
"""

# pylint: disable=protected-access
import requests

from metadata.generated.schema.entity.services.connections.database.sasConnection import (
SASConnection,
)
from metadata.ingestion.ometa.client import REST, APIError, ClientConfig
from metadata.utils.logger import ingestion_logger

logger = ingestion_logger()


class SASClient:
"""
Client to interact with SAS Information Catalog
"""

def __init__(self, config: SASConnection):
self.config: SASConnection = config
self.auth_token = get_token(
config.serverHost, config.username, config.password.get_secret_value()
)
client_config: ClientConfig = ClientConfig(
base_url=config.serverHost,
auth_header="Authorization",
auth_token=self.get_auth_token,
api_version="",
allow_redirects=True,
verify=False,
)
self.client = REST(client_config)
# custom setting
self.enable_datatables = config.datatables
self.custom_filter_datatables = config.dataTablesCustomFilter
self.enable_reports = config.reports
self.custom_filter_reports = config.reportsCustomFilter
self.enable_dataflows = config.dataflows
self.custom_filter_dataflows = config.dataflowsCustomFilter

def check_connection(self):
"""
Check metadata connection to SAS
"""
check_list = []
if self.enable_datatables:
check_list.append("datasets")
if self.enable_reports:
check_list.append("reports")
if self.enable_dataflows:
check_list.append("dataflows")

for asset in check_list:
self.list_assets(asset)

def get_instance(self, instance_id):
endpoint = f"catalog/instances/{instance_id}"
headers = {
"Accept": "application/vnd.sas.metadata.instance.entity.detail+json",
}
response = self.client._request("GET", path=endpoint, headers=headers)
if "error" in response.keys():
raise APIError(response["error"])
return response

def get_information_catalog_link(self, instance_id):
return f"{self.config.serverHost}/SASInformationCatalog/details/~fs~catalog~fs~instances~fs~{instance_id}"

def list_assets(self, assets):
"""
Get all assets based on asset types
"""
if assets == "datasets":
enable_asset = self.enable_datatables
asset_filter = self.custom_filter_datatables
elif assets == "reports":
enable_asset = self.enable_reports
asset_filter = self.custom_filter_reports
elif assets == "dataflows":
enable_asset = self.enable_dataflows
asset_filter = self.custom_filter_dataflows

logger.debug(
f"Configuration for {assets}: enable {assets} - {enable_asset}, "
f"custom {assets} filter - {asset_filter}"
)
endpoint = (
f"catalog/search?indices={assets}&q="
# f"{asset_filter if str(asset_filter) != 'None' else '*'}"
f"{asset_filter if str(asset_filter) != 'None' else '*'}&limit=10" # TODO: MAKE THE CHANGE
)
headers = {"Accept-Item": "application/vnd.sas.metadata.instance.entity+json"}
response = self.client._request("GET", path=endpoint, headers=headers)
if "error" in response.keys():
raise APIError(response["error"])
return response["items"]

def get_views(self, query):
endpoint = "catalog/instances"
headers = {
"Content-type": "application/vnd.sas.metadata.instance.query+json",
"Accept": "application/json",
}
logger.info(f"{query}")
response = self.client._request(
"POST", path=endpoint, data=query, headers=headers
)
if "error" in response.keys():
raise APIError(f"{response}")
return response

def get_data_source(self, endpoint):
headers = {
"Accept-Item": "application/vnd.sas.data.source+json",
}
response = self.client._request("GET", path=endpoint, headers=headers)
logger.info(f"{response}")
if "error" in response.keys():
raise APIError(response["error"])
return response

def get_report_link(self, resource, uri):
revised_uri = uri.replace("/", "%2F")
endpoint = f"/links/resources/{resource}?uri={revised_uri}"
return self.config.serverHost + endpoint

def load_table(self, endpoint):
self.client.put(path=endpoint, data={})

def get_report_relationship(self, report_id):
endpoint = f"reports/commons/relationships/reports/{report_id}"
response = self.client.get(endpoint)
if "error" in response.keys():
raise APIError(response["error"])
dependencies = []
for item in response["items"]:
if item["type"] == "Dependent":
dependencies.append(item)
return dependencies

def get_resource(self, endpoint):
response = self.client.get(endpoint)
if "error" in response.keys():
raise APIError(response["error"])
return response

def get_instances_with_param(self, data):
endpoint = f"catalog/instances?{data}"
response = self.client.get(endpoint)
if "error" in response.keys():
raise APIError(response["error"])
return response["items"]

def get_auth_token(self):
return self.auth_token, 0


def get_token(base_url, user, password):
endpoint = "/SASLogon/oauth/token"
payload = {"grant_type": "password", "username": user, "password": password}
headers = {
"Content-type": "application/x-www-form-urlencoded",
"Authorization": "Basic c2FzLmNsaTo=",
}
url = base_url + endpoint
response = requests.request(
"POST", url, headers=headers, data=payload, verify=False, timeout=10
)
text_response = response.json()
logger.info(f"this is user: {user}, password: {password}, text: {text_response}")
return response.json()["access_token"]
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
Source connection handler
"""
from typing import Optional

from metadata.generated.schema.entity.automations.workflow import (
Workflow as AutomationWorkflow,
)
from metadata.generated.schema.entity.services.connections.database.sasConnection import (
SASConnection,
)
from metadata.ingestion.connections.test_connections import test_connection_steps
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.database.sas.client import SASClient
from metadata.utils.logger import ingestion_logger

logger = ingestion_logger()


def get_connection(connection: SASConnection) -> SASClient:
return SASClient(connection)


def test_connection(
metadata: OpenMetadata,
client: SASClient,
service_connection: SASConnection,
automation_workflow: Optional[AutomationWorkflow] = None,
) -> None:
test_fn = {"CheckAccess": client.check_connection}
test_connection_steps(
metadata=metadata,
test_fn=test_fn,
service_type=service_connection.type.value,
automation_workflow=automation_workflow,
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
Extension attributes
"""

TABLE_CUSTOM_ATTR = [
# Dataset attributes
{
"name": "analysisTimeStamp",
"description": "The timestamp indicating when this object was last analyzed.",
"propertyType": {"id": "STRING_TYPE", "type": "type"},
},
{
"name": "creator",
"description": "The creator/author of this object.",
"propertyType": {"id": "STRING_TYPE", "type": "type"},
},
{
"name": "editor",
"description": "Specifies the Person who edited the object.",
"propertyType": {"id": "STRING_TYPE", "type": "type"},
},
{
"name": "rowCount",
"description": "Number of rows in the data set.",
"propertyType": {"id": "INT_TYPE", "type": "type"},
},
{
"name": "columnCount",
"description": "Number of columns in the data set.",
"propertyType": {"id": "INT_TYPE", "type": "type"},
},
{
"name": "dataSize",
"description": "Size of the data set in bytes.",
"propertyType": {"id": "INT_TYPE", "type": "type"},
},
{
"name": "completenessPercent",
"description": "The percentage of completeness for this data set.",
"propertyType": {"id": "INT_TYPE", "type": "type"},
},
{
"name": "dateCreated",
"description": "The date on which the object was created.",
"propertyType": {"id": "STRING_TYPE", "type": "type"},
},
{
"name": "dateModified",
"description": "The date on which the object was most recently modified.",
"propertyType": {"id": "STRING_TYPE", "type": "type"},
},
{
"name": "source",
"description": "The context from which the referenced resource was obtained.",
"propertyType": {
"id": "STRING_TYPE",
"type": "type",
},
},
# SAS Table attributes
{
"name": "CASLIB",
"description": "The name of the CAS library for this table.",
"propertyType": {"id": "STRING_TYPE", "type": "type"},
},
{
"name": "casHost",
"description": "The CAS host for the library for this table.",
"propertyType": {"id": "STRING_TYPE", "type": "type"},
},
{
"name": "engineName",
"description": "The name of the SAS data access engine used to connect to data.",
"propertyType": {"id": "STRING_TYPE", "type": "type"},
},
{
"name": "casPort",
"description": "The CAS port for the library for this table.",
"propertyType": {"id": "INT_TYPE", "type": "type"},
},
# CAS Table attributes
{
"name": "sourceName",
"description": "Name of the file source for this data set.",
"propertyType": {"id": "STRING_TYPE", "type": "type"},
},
{
"name": "sourceCaslib",
"description": "Name of the caslib source for this data set.",
"propertyType": {"id": "STRING_TYPE", "type": "type"},
},
]
Loading
Loading