Skip to content

Commit

Permalink
Fix #14115: Seperate Unity Catalog From Databricks
Browse files Browse the repository at this point in the history
  • Loading branch information
ulixius9 committed Nov 28, 2023
1 parent e2043a3 commit 7f1962c
Show file tree
Hide file tree
Showing 27 changed files with 892 additions and 588 deletions.
27 changes: 27 additions & 0 deletions ingestion/src/metadata/examples/workflows/unity_catalog.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
source:
type: unitycatalog
serviceName: local_unitycatalog
serviceConnection:
config:
type: UnityCatalog
catalog: hive_metastore
databaseSchema: default
token: <databricks token>
hostPort: localhost:443
connectionTimeout: 120
connectionArguments:
http_path: <http path of databricks cluster>

sourceConfig:
config:
type: DatabaseMetadata
sink:
type: metadata-rest
config: {}
workflowConfig:
loggerLevel: DEBUG
openMetadataServerConfig:
hostPort: http://localhost:8585/api
authProvider: openmetadata
securityConfig:
jwtToken: "eyJraWQiOiJHYjM4OWEtOWY3Ni1nZGpzLWE5MmotMDI0MmJrOTQzNTYiLCJ0eXAiOiJKV1QiLCJhbGciOiJSUzI1NiJ9.eyJzdWIiOiJhZG1pbiIsImlzQm90IjpmYWxzZSwiaXNzIjoib3Blbi1tZXRhZGF0YS5vcmciLCJpYXQiOjE2NjM5Mzg0NjIsImVtYWlsIjoiYWRtaW5Ab3Blbm1ldGFkYXRhLm9yZyJ9.tS8um_5DKu7HgzGBzS1VTA5uUjKWOCU0B_j08WXBiEC0mr0zNREkqVfwFDD-d24HlNEbrqioLsBuFRiwIWKc1m_ZlVQbG7P36RUxhuv2vbSp80FKyNM-Tj93FDzq91jsyNmsQhyNv_fNr3TXfzzSPjHt8Go0FMMP66weoKMgW2PbXlhVKwEuXUHyakLLzewm9UMeQaEiRzhiTMU3UkLXcKbYEJJvfNFcLwSl9W8JCO_l0Yj3ud-qt_nQYEZwqW6u5nfdQllN133iikV4fM5QZsMCnm8Rq1mvLR0y9bmJiD7fwM1tmJ791TUWqmKaTnP49U493VanKpUAfzIiOiIbhg"
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
source:
type: unitycatalog-lineage
serviceName: local_unitycatalog
sourceConfig:
config:
type: DatabaseLineage
queryLogDuration: 1
resultLimit: 10000
sink:
type: metadata-rest
config: {}
workflowConfig:
loggerLevel: DEBUG
openMetadataServerConfig:
hostPort: http://localhost:8585/api
authProvider: openmetadata
securityConfig:
jwtToken: "eyJraWQiOiJHYjM4OWEtOWY3Ni1nZGpzLWE5MmotMDI0MmJrOTQzNTYiLCJ0eXAiOiJKV1QiLCJhbGciOiJSUzI1NiJ9.eyJzdWIiOiJhZG1pbiIsImlzQm90IjpmYWxzZSwiaXNzIjoib3Blbi1tZXRhZGF0YS5vcmciLCJpYXQiOjE2NjM5Mzg0NjIsImVtYWlsIjoiYWRtaW5Ab3Blbm1ldGFkYXRhLm9yZyJ9.tS8um_5DKu7HgzGBzS1VTA5uUjKWOCU0B_j08WXBiEC0mr0zNREkqVfwFDD-d24HlNEbrqioLsBuFRiwIWKc1m_ZlVQbG7P36RUxhuv2vbSp80FKyNM-Tj93FDzq91jsyNmsQhyNv_fNr3TXfzzSPjHt8Go0FMMP66weoKMgW2PbXlhVKwEuXUHyakLLzewm9UMeQaEiRzhiTMU3UkLXcKbYEJJvfNFcLwSl9W8JCO_l0Yj3ud-qt_nQYEZwqW6u5nfdQllN133iikV4fM5QZsMCnm8Rq1mvLR0y9bmJiD7fwM1tmJ791TUWqmKaTnP49U493VanKpUAfzIiOiIbhg"
35 changes: 35 additions & 0 deletions ingestion/src/metadata/examples/workflows/unity_catalog_usage.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
source:
type: unitycatalog-usage
serviceName: local_unitycatalog
serviceConnection:
config:
type: UnityCatalog
catalog: hive_metastore
databaseSchema: default
token: <databricks token>
hostPort: localhost:443
connectionTimeout: 120
connectionArguments:
http_path: <http path of databricks cluster>
sourceConfig:
config:
type: DatabaseUsage
queryLogDuration: 10
processor:
type: query-parser
config: {}
stage:
type: table-usage
config:
filename: /tmp/databricks_usage
bulkSink:
type: metadata-usage
config:
filename: /tmp/databricks_usage
workflowConfig:
loggerLevel: DEBUG
openMetadataServerConfig:
hostPort: http://localhost:8585/api
authProvider: openmetadata
securityConfig:
jwtToken: "eyJraWQiOiJHYjM4OWEtOWY3Ni1nZGpzLWE5MmotMDI0MmJrOTQzNTYiLCJ0eXAiOiJKV1QiLCJhbGciOiJSUzI1NiJ9.eyJzdWIiOiJhZG1pbiIsImlzQm90IjpmYWxzZSwiaXNzIjoib3Blbi1tZXRhZGF0YS5vcmciLCJpYXQiOjE2NjM5Mzg0NjIsImVtYWlsIjoiYWRtaW5Ab3Blbm1ldGFkYXRhLm9yZyJ9.tS8um_5DKu7HgzGBzS1VTA5uUjKWOCU0B_j08WXBiEC0mr0zNREkqVfwFDD-d24HlNEbrqioLsBuFRiwIWKc1m_ZlVQbG7P36RUxhuv2vbSp80FKyNM-Tj93FDzq91jsyNmsQhyNv_fNr3TXfzzSPjHt8Go0FMMP66weoKMgW2PbXlhVKwEuXUHyakLLzewm9UMeQaEiRzhiTMU3UkLXcKbYEJJvfNFcLwSl9W8JCO_l0Yj3ud-qt_nQYEZwqW6u5nfdQllN133iikV4fM5QZsMCnm8Rq1mvLR0y9bmJiD7fwM1tmJ791TUWqmKaTnP49U493VanKpUAfzIiOiIbhg"
Original file line number Diff line number Diff line change
Expand Up @@ -22,19 +22,13 @@
DatabricksConnection,
)
from metadata.ingestion.ometa.client import APIError
from metadata.ingestion.source.database.databricks.models import (
LineageColumnStreams,
LineageTableStreams,
)
from metadata.utils.constants import QUERY_WITH_DBT, QUERY_WITH_OM_VERSION
from metadata.utils.helpers import datetime_to_ts
from metadata.utils.logger import ingestion_logger

logger = ingestion_logger()
API_TIMEOUT = 10
QUERIES_PATH = "/sql/history/queries"
TABLE_LINEAGE_PATH = "/lineage-tracking/table-lineage/get"
COLUMN_LINEAGE_PATH = "/lineage-tracking/column-lineage/get"


class DatabricksClient:
Expand Down Expand Up @@ -216,55 +210,3 @@ def get_job_runs(self, job_id) -> List[dict]:
logger.error(exc)

return job_runs

def get_table_lineage(self, table_name: str) -> LineageTableStreams:
"""
Method returns table lineage details
"""
try:
data = {
"table_name": table_name,
}

response = self.client.get(
f"{self.base_url}{TABLE_LINEAGE_PATH}",
headers=self.headers,
data=json.dumps(data),
timeout=API_TIMEOUT,
).json()
if response:
return LineageTableStreams(**response)

except Exception as exc:
logger.debug(traceback.format_exc())
logger.error(exc)

return LineageTableStreams()

def get_column_lineage(
self, table_name: str, column_name: str
) -> LineageColumnStreams:
"""
Method returns table lineage details
"""
try:
data = {
"table_name": table_name,
"column_name": column_name,
}

response = self.client.get(
f"{self.base_url}{COLUMN_LINEAGE_PATH}",
headers=self.headers,
data=json.dumps(data),
timeout=API_TIMEOUT,
).json()

if response:
return LineageColumnStreams(**response)

except Exception as exc:
logger.debug(traceback.format_exc())
logger.error(exc)

return LineageColumnStreams()
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,8 @@
Source connection handler
"""
from functools import partial
from typing import Optional, Union
from typing import Optional

from databricks.sdk import WorkspaceClient
from sqlalchemy.engine import Engine
from sqlalchemy.exc import DatabaseError
from sqlalchemy.inspection import inspect
Expand All @@ -37,11 +36,9 @@
)
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.database.databricks.client import DatabricksClient
from metadata.ingestion.source.database.databricks.models import DatabricksTable
from metadata.ingestion.source.database.databricks.queries import (
DATABRICKS_GET_CATALOGS,
)
from metadata.utils.db_utils import get_host_from_host_port
from metadata.utils.logger import ingestion_logger

logger = ingestion_logger()
Expand All @@ -52,17 +49,11 @@ def get_connection_url(connection: DatabricksConnection) -> str:
return url


def get_connection(connection: DatabricksConnection) -> Union[Engine, WorkspaceClient]:
def get_connection(connection: DatabricksConnection) -> Engine:
"""
Create connection
"""

if connection.useUnityCatalog:
return WorkspaceClient(
host=get_host_from_host_port(connection.hostPort),
token=connection.token.get_secret_value(),
)

if connection.httpPath:
if not connection.connectionArguments:
connection.connectionArguments = init_empty_connection_arguments()
Expand All @@ -77,7 +68,7 @@ def get_connection(connection: DatabricksConnection) -> Union[Engine, WorkspaceC

def test_connection(
metadata: OpenMetadata,
connection: Union[Engine, WorkspaceClient],
connection: Engine,
service_connection: DatabricksConnection,
automation_workflow: Optional[AutomationWorkflow] = None,
) -> None:
Expand All @@ -99,49 +90,19 @@ def test_database_query(engine: Engine, statement: str):
except DatabaseError as soe:
logger.debug(f"Failed to fetch catalogs due to: {soe}")

if service_connection.useUnityCatalog:
table_obj = DatabricksTable()

def get_catalogs(connection: WorkspaceClient, table_obj: DatabricksTable):
for catalog in connection.catalogs.list():
table_obj.catalog_name = catalog.name
break

def get_schemas(connection: WorkspaceClient, table_obj: DatabricksTable):
for schema in connection.schemas.list(catalog_name=table_obj.catalog_name):
table_obj.schema_name = schema.name
break

def get_tables(connection: WorkspaceClient, table_obj: DatabricksTable):
for table in connection.tables.list(
catalog_name=table_obj.catalog_name, schema_name=table_obj.schema_name
):
table_obj.name = table.name
break

test_fn = {
"CheckAccess": connection.catalogs.list,
"GetDatabases": partial(get_catalogs, connection, table_obj),
"GetSchemas": partial(get_schemas, connection, table_obj),
"GetTables": partial(get_tables, connection, table_obj),
"GetViews": partial(get_tables, connection, table_obj),
"GetQueries": client.test_query_api_access,
}

else:
inspector = inspect(connection)
test_fn = {
"CheckAccess": partial(test_connection_engine_step, connection),
"GetSchemas": inspector.get_schema_names,
"GetTables": inspector.get_table_names,
"GetViews": inspector.get_view_names,
"GetDatabases": partial(
test_database_query,
engine=connection,
statement=DATABRICKS_GET_CATALOGS,
),
"GetQueries": client.test_query_api_access,
}
inspector = inspect(connection)
test_fn = {
"CheckAccess": partial(test_connection_engine_step, connection),
"GetSchemas": inspector.get_schema_names,
"GetTables": inspector.get_table_names,
"GetViews": inspector.get_view_names,
"GetDatabases": partial(
test_database_query,
engine=connection,
statement=DATABRICKS_GET_CATALOGS,
),
"GetQueries": client.test_query_api_access,
}

test_connection_steps(
metadata=metadata,
Expand Down

This file was deleted.

Loading

0 comments on commit 7f1962c

Please sign in to comment.