From 02daf2933b325ca2d3a9deb5179ceedbcc8c1e94 Mon Sep 17 00:00:00 2001 From: Mayuri N Date: Thu, 14 Mar 2024 19:01:46 +0530 Subject: [PATCH 1/3] fix(ingest/databricks): support hive metastore schemas with special chars Also adds - exception handling to some scopes limit impact on rest of ingestion - more details on unity-catalog source --- .../docs/sources/databricks/README.md | 4 +- .../source/unity/hive_metastore_proxy.py | 126 ++++++++++++------ .../ingestion/source/unity/proxy_profiling.py | 36 +++-- .../unity/test_unity_catalog_ingest.py | 6 +- 4 files changed, 116 insertions(+), 56 deletions(-) diff --git a/metadata-ingestion/docs/sources/databricks/README.md b/metadata-ingestion/docs/sources/databricks/README.md index b380a892c22b9..2f6d68e07cdc7 100644 --- a/metadata-ingestion/docs/sources/databricks/README.md +++ b/metadata-ingestion/docs/sources/databricks/README.md @@ -1,12 +1,12 @@ DataHub supports integration with Databricks ecosystem using a multitude of connectors, depending on your exact setup. -## Databricks Hive +## Databricks Hive (old) The simplest way to integrate is usually via the Hive connector. The [Hive starter recipe](http://datahubproject.io/docs/generated/ingestion/sources/hive#starter-recipe) has a section describing how to connect to your Databricks workspace. ## Databricks Unity Catalog (new) -The recently introduced [Unity Catalog](https://www.databricks.com/product/unity-catalog) provides a new way to govern your assets within the Databricks lakehouse. If you have enabled Unity Catalog, you can use the `unity-catalog` source (see below) to integrate your metadata into DataHub as an alternate to the Hive pathway. +The recently introduced [Unity Catalog](https://www.databricks.com/product/unity-catalog) provides a new way to govern your assets within the Databricks lakehouse. If you have Unity Catalog Enabled Workspace, you can use the `unity-catalog` source (aka `databricks` source, see below for details) to integrate your metadata into DataHub as an alternate to the Hive pathway. This also ingests hive metastore catalog in Databricks and is recommended approach to ingest Databricks ecosystem in DataHub. ## Databricks Spark diff --git a/metadata-ingestion/src/datahub/ingestion/source/unity/hive_metastore_proxy.py b/metadata-ingestion/src/datahub/ingestion/source/unity/hive_metastore_proxy.py index 2a98dda1c79c5..7fa230fb74c78 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/unity/hive_metastore_proxy.py +++ b/metadata-ingestion/src/datahub/ingestion/source/unity/hive_metastore_proxy.py @@ -100,14 +100,47 @@ def hive_metastore_schemas(self, catalog: Catalog) -> Iterable[Schema]: ) def hive_metastore_tables(self, schema: Schema) -> Iterable[Table]: - views = self.inspector.get_view_names(schema.name) + views = self.get_view_names(schema.name) for table_name in views: - yield self._get_table(schema, table_name, True) + try: + yield self._get_table(schema, table_name, True) + except Exception as e: + logger.debug( + f"Failed to get table {schema.name}.{table_name} due to {e}", + exc_info=True, + ) - for table_name in self.inspector.get_table_names(schema.name): + for table_name in self.get_table_names(schema.name): if table_name in views: continue - yield self._get_table(schema, table_name, False) + try: + yield self._get_table(schema, table_name, False) + except Exception as e: + logger.debug( + f"Failed to get table {schema.name}.{table_name} due to {e}", + exc_info=True, + ) + + def get_table_names(self, schema_name: str) -> List[str]: + try: + rows = self._execute_sql(f"SHOW TABLES FROM `{schema_name}`") + # 3 columns - database, tableName, isTemporary + return [row["tableName"] for row in rows] + except Exception as e: + logger.debug( + f"Failed to get tables {schema_name} due to {e}", exc_info=True + ) + return [] + + def get_view_names(self, schema_name: str) -> List[str]: + try: + + rows = self._execute_sql(f"SHOW VIEWS FROM `{schema_name}`") + # 3 columns - database, tableName, isTemporary + return [row["tableName"] for row in rows] + except Exception as e: + logger.debug(f"Failed to get views {schema_name} due to {e}", exc_info=True) + return [] def _get_table( self, @@ -134,9 +167,9 @@ def _get_table( columns=columns, storage_location=storage_location, data_source_format=datasource_format, - view_definition=self._get_view_definition(schema.name, table_name) - if is_view - else None, + view_definition=( + self._get_view_definition(schema.name, table_name) if is_view else None + ), properties=detailed_info, owner=None, generation=None, @@ -170,41 +203,53 @@ def get_table_profile( else {} ) + column_profiles: List[ColumnProfile] = [] + if include_column_stats: + for column in columns: + column_profile = self._get_column_profile(column.name, ref) + if column_profile: + column_profiles.append(column_profile) + return TableProfile( - num_rows=int(table_stats[ROWS]) - if table_stats.get(ROWS) is not None - else None, - total_size=int(table_stats[BYTES]) - if table_stats.get(BYTES) is not None - else None, + num_rows=( + int(table_stats[ROWS]) if table_stats.get(ROWS) is not None else None + ), + total_size=( + int(table_stats[BYTES]) if table_stats.get(BYTES) is not None else None + ), num_columns=len(columns), - column_profiles=[ - self._get_column_profile(column.name, ref) for column in columns - ] - if include_column_stats - else [], + column_profiles=column_profiles, ) - def _get_column_profile(self, column: str, ref: TableReference) -> ColumnProfile: - - props = self._column_describe_extended(ref.schema, ref.table, column) - col_stats = {} - for prop in props: - col_stats[prop[0]] = prop[1] - return ColumnProfile( - name=column, - null_count=int(col_stats[NUM_NULLS]) - if col_stats.get(NUM_NULLS) is not None - else None, - distinct_count=int(col_stats[DISTINCT_COUNT]) - if col_stats.get(DISTINCT_COUNT) is not None - else None, - min=col_stats.get(MIN), - max=col_stats.get(MAX), - avg_len=col_stats.get(AVG_COL_LEN), - max_len=col_stats.get(MAX_COL_LEN), - version=col_stats.get(VERSION), - ) + def _get_column_profile( + self, column: str, ref: TableReference + ) -> Optional[ColumnProfile]: + try: + props = self._column_describe_extended(ref.schema, ref.table, column) + col_stats = {} + for prop in props: + col_stats[prop[0]] = prop[1] + return ColumnProfile( + name=column, + null_count=( + int(col_stats[NUM_NULLS]) + if col_stats.get(NUM_NULLS) is not None + else None + ), + distinct_count=( + int(col_stats[DISTINCT_COUNT]) + if col_stats.get(DISTINCT_COUNT) is not None + else None + ), + min=col_stats.get(MIN), + max=col_stats.get(MAX), + avg_len=col_stats.get(AVG_COL_LEN), + max_len=col_stats.get(MAX_COL_LEN), + version=col_stats.get(VERSION), + ) + except Exception as e: + logger.debug(f"Failed to get column profile for {ref}.{column} due to {e}") + return None def _get_cached_table_statistics(self, statistics: str) -> dict: # statistics is in format "xx bytes" OR "1382 bytes, 2 rows" @@ -242,9 +287,10 @@ def _get_view_definition(self, schema_name: str, table_name: str) -> Optional[st ) for row in rows: return row[0] - except Exception: + except Exception as e: logger.debug( - f"Failed to get view definition for {schema_name}.{table_name}" + f"Failed to get view definition for {schema_name}.{table_name} due to {e}", + exc_info=True, ) return None diff --git a/metadata-ingestion/src/datahub/ingestion/source/unity/proxy_profiling.py b/metadata-ingestion/src/datahub/ingestion/source/unity/proxy_profiling.py index 5992f103ccac3..eca076156f48a 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/unity/proxy_profiling.py +++ b/metadata-ingestion/src/datahub/ingestion/source/unity/proxy_profiling.py @@ -111,6 +111,12 @@ def get_table_stats( ) else: return None + except Exception as e: + logger.debug(f"Failed to get table stats due to {e}", exc_info=True) + self.report.profile_table_errors.setdefault( + "miscellaneous errors", LossyList() + ).append((str(ref), str(e))) + return None def _should_retry_unsupported_column( self, ref: TableReference, e: DatabricksError @@ -185,12 +191,14 @@ def _create_table_profile( num_rows=self._get_int(table_info, "spark.sql.statistics.numRows"), total_size=self._get_int(table_info, "spark.sql.statistics.totalSize"), num_columns=len(columns_names), - column_profiles=[ - self._create_column_profile(column, table_info) - for column in columns_names - ] - if include_columns - else [], + column_profiles=( + [ + self._create_column_profile(column, table_info) + for column in columns_names + ] + if include_columns + else [] + ), ) def _create_column_profile( @@ -237,12 +245,16 @@ def _raise_if_error( StatementState.CLOSED, ]: raise DatabricksError( - response.status.error.message - if response.status.error and response.status.error.message - else "Unknown Error", - error_code=response.status.error.error_code.value - if response.status.error and response.status.error.error_code - else "Unknown Error Code", + ( + response.status.error.message + if response.status.error and response.status.error.message + else "Unknown Error" + ), + error_code=( + response.status.error.error_code.value + if response.status.error and response.status.error.error_code + else "Unknown Error Code" + ), status=response.status.state.value, context=key, ) diff --git a/metadata-ingestion/tests/integration/unity/test_unity_catalog_ingest.py b/metadata-ingestion/tests/integration/unity/test_unity_catalog_ingest.py index 05f1db0b932f8..b4c93737ccc88 100644 --- a/metadata-ingestion/tests/integration/unity/test_unity_catalog_ingest.py +++ b/metadata-ingestion/tests/integration/unity/test_unity_catalog_ingest.py @@ -371,6 +371,10 @@ def mock_hive_sql(query): "CREATE VIEW `hive_metastore`.`bronze_kambi`.`view1` AS SELECT * FROM `hive_metastore`.`bronze_kambi`.`bet`", ) ] + elif query == "SHOW TABLES FROM `bronze_kambi`": + return [("bet",), ("view1",)] + elif query == "SHOW VIEWS FROM `bronze_kambi`": + return [("view1",)] return [] @@ -392,8 +396,6 @@ def test_ingestion(pytestconfig, tmp_path, requests_mock): inspector = mock.MagicMock() inspector.get_schema_names.return_value = ["bronze_kambi"] - inspector.get_view_names.return_value = ["view1"] - inspector.get_table_names.return_value = ["bet", "view1"] get_inspector.return_value = inspector execute_sql.side_effect = mock_hive_sql From 821f1f9ce6a023f524a0880eb2cef446972707ca Mon Sep 17 00:00:00 2001 From: Mayuri N Date: Fri, 15 Mar 2024 11:49:55 +0530 Subject: [PATCH 2/3] minor tweaks --- .../source/unity/hive_metastore_proxy.py | 4 +- .../datahub/ingestion/source/unity/proxy.py | 53 +++++++++++-------- .../datahub/ingestion/source/unity/usage.py | 1 + .../unity/test_unity_catalog_ingest.py | 11 +++- 4 files changed, 43 insertions(+), 26 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/unity/hive_metastore_proxy.py b/metadata-ingestion/src/datahub/ingestion/source/unity/hive_metastore_proxy.py index 7fa230fb74c78..323b0af8acadf 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/unity/hive_metastore_proxy.py +++ b/metadata-ingestion/src/datahub/ingestion/source/unity/hive_metastore_proxy.py @@ -125,7 +125,7 @@ def get_table_names(self, schema_name: str) -> List[str]: try: rows = self._execute_sql(f"SHOW TABLES FROM `{schema_name}`") # 3 columns - database, tableName, isTemporary - return [row["tableName"] for row in rows] + return [row.tableName for row in rows] except Exception as e: logger.debug( f"Failed to get tables {schema_name} due to {e}", exc_info=True @@ -137,7 +137,7 @@ def get_view_names(self, schema_name: str) -> List[str]: rows = self._execute_sql(f"SHOW VIEWS FROM `{schema_name}`") # 3 columns - database, tableName, isTemporary - return [row["tableName"] for row in rows] + return [row.tableName for row in rows] except Exception as e: logger.debug(f"Failed to get views {schema_name} due to {e}", exc_info=True) return [] diff --git a/metadata-ingestion/src/datahub/ingestion/source/unity/proxy.py b/metadata-ingestion/src/datahub/ingestion/source/unity/proxy.py index 20aa10305fa8f..1e90f3a044f42 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/unity/proxy.py +++ b/metadata-ingestion/src/datahub/ingestion/source/unity/proxy.py @@ -1,6 +1,7 @@ """ Manage the communication with DataBricks Server and provide equivalent dataclasses for dependent modules """ + import dataclasses import logging from datetime import datetime, timezone @@ -204,16 +205,16 @@ def workspace_notebooks(self) -> Iterable[Notebook]: id=obj.object_id, path=obj.path, language=obj.language, - created_at=datetime.fromtimestamp( - obj.created_at / 1000, tz=timezone.utc - ) - if obj.created_at - else None, - modified_at=datetime.fromtimestamp( - obj.modified_at / 1000, tz=timezone.utc - ) - if obj.modified_at - else None, + created_at=( + datetime.fromtimestamp(obj.created_at / 1000, tz=timezone.utc) + if obj.created_at + else None + ), + modified_at=( + datetime.fromtimestamp(obj.modified_at / 1000, tz=timezone.utc) + if obj.modified_at + else None + ), ) def query_history( @@ -268,12 +269,14 @@ def _query_history( response: dict = self._workspace_client.api_client.do( # type: ignore method, path, body={**body, "filter_by": filter_by.as_dict()} ) - # we use default raw=False in above request, therefore will always get dict + # we use default raw=False(default) in above request, therefore will always get dict while True: if "res" not in response or not response["res"]: return for v in response["res"]: yield QueryInfo.from_dict(v) + if not response.get("next_page_token"): # last page + return response = self._workspace_client.api_client.do( # type: ignore method, path, body={**body, "page_token": response["next_page_token"]} ) @@ -434,22 +437,28 @@ def _create_table( schema=schema, storage_location=obj.storage_location, data_source_format=obj.data_source_format, - columns=list(self._extract_columns(obj.columns, table_id)) - if obj.columns - else [], + columns=( + list(self._extract_columns(obj.columns, table_id)) + if obj.columns + else [] + ), view_definition=obj.view_definition or None, properties=obj.properties or {}, owner=obj.owner, generation=obj.generation, - created_at=datetime.fromtimestamp(obj.created_at / 1000, tz=timezone.utc) - if obj.created_at - else None, + created_at=( + datetime.fromtimestamp(obj.created_at / 1000, tz=timezone.utc) + if obj.created_at + else None + ), created_by=obj.created_by, - updated_at=datetime.fromtimestamp(obj.updated_at / 1000, tz=timezone.utc) - if obj.updated_at - else None - if obj.updated_at - else None, + updated_at=( + datetime.fromtimestamp(obj.updated_at / 1000, tz=timezone.utc) + if obj.updated_at + else None + if obj.updated_at + else None + ), updated_by=obj.updated_by, table_id=obj.table_id, comment=obj.comment, diff --git a/metadata-ingestion/src/datahub/ingestion/source/unity/usage.py b/metadata-ingestion/src/datahub/ingestion/source/unity/usage.py index f07e7a92d8762..8d81f22fd1b04 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/unity/usage.py +++ b/metadata-ingestion/src/datahub/ingestion/source/unity/usage.py @@ -147,6 +147,7 @@ def _get_queries(self) -> Iterable[Query]: self.config.start_time, self.config.end_time ) except Exception as e: + breakpoint() logger.warning("Error getting queries", exc_info=True) self.report.report_warning("get-queries", str(e)) diff --git a/metadata-ingestion/tests/integration/unity/test_unity_catalog_ingest.py b/metadata-ingestion/tests/integration/unity/test_unity_catalog_ingest.py index b4c93737ccc88..eccf9bea3b3dc 100644 --- a/metadata-ingestion/tests/integration/unity/test_unity_catalog_ingest.py +++ b/metadata-ingestion/tests/integration/unity/test_unity_catalog_ingest.py @@ -1,4 +1,5 @@ import uuid +from collections import namedtuple from unittest import mock from unittest.mock import patch @@ -272,6 +273,9 @@ def register_mock_data(workspace_client): ] +TableEntry = namedtuple("TableEntry", ["database", "tableName", "isTemporary"]) + + def mock_hive_sql(query): if query == "DESCRIBE EXTENDED `bronze_kambi`.`bet` betStatusId": @@ -372,9 +376,12 @@ def mock_hive_sql(query): ) ] elif query == "SHOW TABLES FROM `bronze_kambi`": - return [("bet",), ("view1",)] + return [ + TableEntry("bronze_kambi", "bet", False), + TableEntry("bronze_kambi", "view1", False), + ] elif query == "SHOW VIEWS FROM `bronze_kambi`": - return [("view1",)] + return [TableEntry("bronze_kambi", "view1", False)] return [] From f15e76ea5ae1d092d239cad0119e8ba588b78ef1 Mon Sep 17 00:00:00 2001 From: Mayuri N Date: Wed, 27 Mar 2024 13:02:55 +0530 Subject: [PATCH 3/3] emit placeholder table even if error in fetching details --- .../docs/sources/databricks/README.md | 9 +- .../source/unity/hive_metastore_proxy.py | 169 ++++++++++-------- .../ingestion/source/unity/proxy_profiling.py | 8 +- .../datahub/ingestion/source/unity/source.py | 4 +- .../datahub/ingestion/source/unity/usage.py | 1 - .../unity/test_unity_catalog_ingest.py | 5 + .../unity/unity_catalog_mces_golden.json | 143 +++++++++++++++ 7 files changed, 253 insertions(+), 86 deletions(-) diff --git a/metadata-ingestion/docs/sources/databricks/README.md b/metadata-ingestion/docs/sources/databricks/README.md index 2f6d68e07cdc7..32b0b20c9480b 100644 --- a/metadata-ingestion/docs/sources/databricks/README.md +++ b/metadata-ingestion/docs/sources/databricks/README.md @@ -1,13 +1,14 @@ DataHub supports integration with Databricks ecosystem using a multitude of connectors, depending on your exact setup. -## Databricks Hive (old) - -The simplest way to integrate is usually via the Hive connector. The [Hive starter recipe](http://datahubproject.io/docs/generated/ingestion/sources/hive#starter-recipe) has a section describing how to connect to your Databricks workspace. - ## Databricks Unity Catalog (new) The recently introduced [Unity Catalog](https://www.databricks.com/product/unity-catalog) provides a new way to govern your assets within the Databricks lakehouse. If you have Unity Catalog Enabled Workspace, you can use the `unity-catalog` source (aka `databricks` source, see below for details) to integrate your metadata into DataHub as an alternate to the Hive pathway. This also ingests hive metastore catalog in Databricks and is recommended approach to ingest Databricks ecosystem in DataHub. +## Databricks Hive (old) + +The alternative way to integrate is via the Hive connector. The [Hive starter recipe](http://datahubproject.io/docs/generated/ingestion/sources/hive#starter-recipe) has a section describing how to connect to your Databricks workspace. + + ## Databricks Spark To complete the picture, we recommend adding push-based ingestion from your Spark jobs to see real-time activity and lineage between your Databricks tables and your Spark jobs. Use the Spark agent to push metadata to DataHub using the instructions [here](../../../../metadata-integration/java/spark-lineage/README.md#configuration-instructions-databricks). diff --git a/metadata-ingestion/src/datahub/ingestion/source/unity/hive_metastore_proxy.py b/metadata-ingestion/src/datahub/ingestion/source/unity/hive_metastore_proxy.py index 323b0af8acadf..140698a6c4b10 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/unity/hive_metastore_proxy.py +++ b/metadata-ingestion/src/datahub/ingestion/source/unity/hive_metastore_proxy.py @@ -21,6 +21,7 @@ TableProfile, TableReference, ) +from datahub.ingestion.source.unity.report import UnityCatalogReport logger = logging.getLogger(__name__) HIVE_METASTORE = "hive_metastore" @@ -66,9 +67,12 @@ class HiveMetastoreProxy(Closeable): as unity catalog apis do not return details about this legacy metastore. """ - def __init__(self, sqlalchemy_url: str, options: dict) -> None: + def __init__( + self, sqlalchemy_url: str, options: dict, report: UnityCatalogReport + ) -> None: try: self.inspector = HiveMetastoreProxy.get_inspector(sqlalchemy_url, options) + self.report = report except Exception: # This means that there is no `hive_metastore` catalog in databricks workspace # Not tested but seems like the logical conclusion. @@ -100,26 +104,19 @@ def hive_metastore_schemas(self, catalog: Catalog) -> Iterable[Schema]: ) def hive_metastore_tables(self, schema: Schema) -> Iterable[Table]: + # NOTE: Ideally, we use `inspector.get_view_names` and `inspector.get_table_names` here instead of + # making show queries in this class however Databricks dialect for databricks-sql-connector<3.0.0 does not + # back-quote schemas with special char such as hyphen. + # Currently, databricks-sql-connector is pinned to <3.0.0 due to requirement of SQLAlchemy > 2.0.21 for + # later versions. views = self.get_view_names(schema.name) for table_name in views: - try: - yield self._get_table(schema, table_name, True) - except Exception as e: - logger.debug( - f"Failed to get table {schema.name}.{table_name} due to {e}", - exc_info=True, - ) + yield self._get_table(schema, table_name, True) for table_name in self.get_table_names(schema.name): if table_name in views: continue - try: - yield self._get_table(schema, table_name, False) - except Exception as e: - logger.debug( - f"Failed to get table {schema.name}.{table_name} due to {e}", - exc_info=True, - ) + yield self._get_table(schema, table_name, False) def get_table_names(self, schema_name: str) -> List[str]: try: @@ -127,19 +124,24 @@ def get_table_names(self, schema_name: str) -> List[str]: # 3 columns - database, tableName, isTemporary return [row.tableName for row in rows] except Exception as e: - logger.debug( + self.report.report_warning( + "Failed to get tables for schema", f"{HIVE_METASTORE}.{schema_name}" + ) + logger.warning( f"Failed to get tables {schema_name} due to {e}", exc_info=True ) return [] def get_view_names(self, schema_name: str) -> List[str]: try: - rows = self._execute_sql(f"SHOW VIEWS FROM `{schema_name}`") # 3 columns - database, tableName, isTemporary return [row.tableName for row in rows] except Exception as e: - logger.debug(f"Failed to get views {schema_name} due to {e}", exc_info=True) + self.report.report_warning("Failed to get views for schema", schema_name) + logger.warning( + f"Failed to get views {schema_name} due to {e}", exc_info=True + ) return [] def _get_table( @@ -148,7 +150,7 @@ def _get_table( table_name: str, is_view: bool = False, ) -> Table: - columns = self._get_columns(schema, table_name) + columns = self._get_columns(schema.name, table_name) detailed_info = self._get_table_info(schema.name, table_name) comment = detailed_info.pop("Comment", None) @@ -183,20 +185,16 @@ def _get_table( def get_table_profile( self, ref: TableReference, include_column_stats: bool = False - ) -> TableProfile: + ) -> Optional[TableProfile]: columns = self._get_columns( - Schema( - id=ref.schema, - name=ref.schema, - # This is okay, as none of this is used in profiling - catalog=self.hive_metastore_catalog(None), - comment=None, - owner=None, - ), + ref.schema, ref.table, ) detailed_info = self._get_table_info(ref.schema, ref.table) + if not columns and not detailed_info: + return None + table_stats = ( self._get_cached_table_statistics(detailed_info["Statistics"]) if detailed_info.get("Statistics") @@ -288,6 +286,10 @@ def _get_view_definition(self, schema_name: str, table_name: str) -> Optional[st for row in rows: return row[0] except Exception as e: + self.report.report_warning( + "Failed to get view definition for table", + f"{HIVE_METASTORE}.{schema_name}.{table_name}", + ) logger.debug( f"Failed to get view definition for {schema_name}.{table_name} due to {e}", exc_info=True, @@ -304,60 +306,81 @@ def _get_table_type(self, type: Optional[str]) -> HiveTableType: else: return HiveTableType.UNKNOWN + @lru_cache(maxsize=1) def _get_table_info(self, schema_name: str, table_name: str) -> dict: - rows = self._describe_extended(schema_name, table_name) - - index = rows.index(("# Detailed Table Information", "", "")) - rows = rows[index + 1 :] - # Copied from https://github.com/acryldata/PyHive/blob/master/pyhive/sqlalchemy_hive.py#L375 # Generate properties dictionary. properties = {} - active_heading = None - for col_name, data_type, value in rows: - col_name = col_name.rstrip() - if col_name.startswith("# "): - continue - elif col_name == "" and data_type is None: - active_heading = None - continue - elif col_name != "" and data_type is None: - active_heading = col_name - elif col_name != "" and data_type is not None: - properties[col_name] = data_type.strip() - else: - # col_name == "", data_type is not None - prop_name = "{} {}".format(active_heading, data_type.rstrip()) - properties[prop_name] = value.rstrip() + try: + rows = self._describe_extended(schema_name, table_name) + + index = rows.index(("# Detailed Table Information", "", "")) + rows = rows[index + 1 :] + # Copied from https://github.com/acryldata/PyHive/blob/master/pyhive/sqlalchemy_hive.py#L375 + + active_heading = None + for col_name, data_type, value in rows: + col_name = col_name.rstrip() + if col_name.startswith("# "): + continue + elif col_name == "" and data_type is None: + active_heading = None + continue + elif col_name != "" and data_type is None: + active_heading = col_name + elif col_name != "" and data_type is not None: + properties[col_name] = data_type.strip() + else: + # col_name == "", data_type is not None + prop_name = "{} {}".format(active_heading, data_type.rstrip()) + properties[prop_name] = value.rstrip() + except Exception as e: + self.report.report_warning( + "Failed to get detailed info for table", + f"{HIVE_METASTORE}.{schema_name}.{table_name}", + ) + logger.debug( + f"Failed to get detailed info for table {schema_name}.{table_name} due to {e}", + exc_info=True, + ) return properties - def _get_columns(self, schema: Schema, table_name: str) -> List[Column]: - rows = self._describe_extended(schema.name, table_name) - + @lru_cache(maxsize=1) + def _get_columns(self, schema_name: str, table_name: str) -> List[Column]: columns: List[Column] = [] - for i, row in enumerate(rows): - if i == 0 and row[0].strip() == "col_name": - continue # first row - if row[0].strip() in ( - "", - "# Partition Information", - "# Detailed Table Information", - ): - break - columns.append( - Column( - name=row[0].strip(), - id=f"{schema.id}.{table_name}.{row[0].strip()}", - type_text=row[1].strip(), - type_name=type_map.get(row[1].strip().lower()), - type_scale=None, - type_precision=None, - position=None, - nullable=None, - comment=row[2], + try: + rows = self._describe_extended(schema_name, table_name) + for i, row in enumerate(rows): + if i == 0 and row[0].strip() == "col_name": + continue # first row + if row[0].strip() in ( + "", + "# Partition Information", + "# Detailed Table Information", + ): + break + columns.append( + Column( + name=row[0].strip(), + id=f"{HIVE_METASTORE}.{schema_name}.{table_name}.{row[0].strip()}", + type_text=row[1].strip(), + type_name=type_map.get(row[1].strip().lower()), + type_scale=None, + type_precision=None, + position=None, + nullable=None, + comment=row[2], + ) ) + except Exception as e: + self.report.report_warning( + "Failed to get columns for table", + f"{HIVE_METASTORE}.{schema_name}.{table_name}", + ) + logger.debug( + f"Failed to get columns for table {schema_name}.{table_name} due to {e}", + exc_info=True, ) - return columns @lru_cache(maxsize=1) diff --git a/metadata-ingestion/src/datahub/ingestion/source/unity/proxy_profiling.py b/metadata-ingestion/src/datahub/ingestion/source/unity/proxy_profiling.py index eca076156f48a..5d6d2bec6d2fc 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/unity/proxy_profiling.py +++ b/metadata-ingestion/src/datahub/ingestion/source/unity/proxy_profiling.py @@ -111,12 +111,6 @@ def get_table_stats( ) else: return None - except Exception as e: - logger.debug(f"Failed to get table stats due to {e}", exc_info=True) - self.report.profile_table_errors.setdefault( - "miscellaneous errors", LossyList() - ).append((str(ref), str(e))) - return None def _should_retry_unsupported_column( self, ref: TableReference, e: DatabricksError @@ -171,7 +165,7 @@ def _check_analyze_table_statement_status( def _get_table_profile( self, ref: TableReference, include_columns: bool - ) -> TableProfile: + ) -> Optional[TableProfile]: if self.hive_metastore_proxy and ref.catalog == HIVE_METASTORE: return self.hive_metastore_proxy.get_table_profile(ref, include_columns) table_info = self._workspace_client.tables.get(ref.qualified_table_name) diff --git a/metadata-ingestion/src/datahub/ingestion/source/unity/source.py b/metadata-ingestion/src/datahub/ingestion/source/unity/source.py index 9a326ec584d21..2008991dad72e 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/unity/source.py +++ b/metadata-ingestion/src/datahub/ingestion/source/unity/source.py @@ -213,7 +213,9 @@ def init_hive_metastore_proxy(self): if self.config.include_hive_metastore: try: self.hive_metastore_proxy = HiveMetastoreProxy( - self.config.get_sql_alchemy_url(HIVE_METASTORE), self.config.options + self.config.get_sql_alchemy_url(HIVE_METASTORE), + self.config.options, + self.report, ) self.report.hive_metastore_catalog_found = True diff --git a/metadata-ingestion/src/datahub/ingestion/source/unity/usage.py b/metadata-ingestion/src/datahub/ingestion/source/unity/usage.py index 8d81f22fd1b04..f07e7a92d8762 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/unity/usage.py +++ b/metadata-ingestion/src/datahub/ingestion/source/unity/usage.py @@ -147,7 +147,6 @@ def _get_queries(self) -> Iterable[Query]: self.config.start_time, self.config.end_time ) except Exception as e: - breakpoint() logger.warning("Error getting queries", exc_info=True) self.report.report_warning("get-queries", str(e)) diff --git a/metadata-ingestion/tests/integration/unity/test_unity_catalog_ingest.py b/metadata-ingestion/tests/integration/unity/test_unity_catalog_ingest.py index eb69c30a36412..f22e15da45df2 100644 --- a/metadata-ingestion/tests/integration/unity/test_unity_catalog_ingest.py +++ b/metadata-ingestion/tests/integration/unity/test_unity_catalog_ingest.py @@ -371,6 +371,10 @@ def mock_hive_sql(query): ("Type", "VIEW", ""), ("Owner", "root", ""), ] + elif query == "DESCRIBE EXTENDED `bronze_kambi`.`delta_error_table`": + raise Exception( + "[DELTA_PATH_DOES_NOT_EXIST] doesn't exist, or is not a Delta table." + ) elif query == "SHOW CREATE TABLE `bronze_kambi`.`view1`": return [ ( @@ -380,6 +384,7 @@ def mock_hive_sql(query): elif query == "SHOW TABLES FROM `bronze_kambi`": return [ TableEntry("bronze_kambi", "bet", False), + TableEntry("bronze_kambi", "delta_error_table", False), TableEntry("bronze_kambi", "view1", False), ] elif query == "SHOW VIEWS FROM `bronze_kambi`": diff --git a/metadata-ingestion/tests/integration/unity/unity_catalog_mces_golden.json b/metadata-ingestion/tests/integration/unity/unity_catalog_mces_golden.json index 1f0193fef6063..f01878fed1353 100644 --- a/metadata-ingestion/tests/integration/unity/unity_catalog_mces_golden.json +++ b/metadata-ingestion/tests/integration/unity/unity_catalog_mces_golden.json @@ -1394,6 +1394,22 @@ "lastRunId": "no-run-id-provided" } }, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:databricks,hive_metastore.bronze_kambi.delta_error_table,PROD)", + "changeType": "UPSERT", + "aspectName": "container", + "aspect": { + "json": { + "container": "urn:li:container:21058fb6993a790a4a43727021e52956" + } + }, + "systemMetadata": { + "lastObserved": 1638860400000, + "runId": "unity-catalog-test", + "lastRunId": "no-run-id-provided" + } +}, { "entityType": "container", "entityUrn": "urn:li:container:730e95cd0271453376b3c1d9623838d6", @@ -1417,6 +1433,74 @@ "lastRunId": "no-run-id-provided" } }, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:databricks,hive_metastore.bronze_kambi.delta_error_table,PROD)", + "changeType": "PATCH", + "aspectName": "datasetProperties", + "aspect": { + "json": [ + { + "op": "add", + "path": "/name", + "value": "delta_error_table" + }, + { + "op": "add", + "path": "/qualifiedName", + "value": "hive_metastore.bronze_kambi.delta_error_table" + }, + { + "op": "add", + "path": "/customProperties/table_type", + "value": "UNKNOWN" + }, + { + "op": "add", + "path": "/customProperties/table_id", + "value": "hive_metastore.bronze_kambi.delta_error_table" + } + ] + }, + "systemMetadata": { + "lastObserved": 1638860400000, + "runId": "unity-catalog-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:databricks,hive_metastore.bronze_kambi.delta_error_table,PROD)", + "changeType": "UPSERT", + "aspectName": "schemaMetadata", + "aspect": { + "json": { + "schemaName": "hive_metastore.bronze_kambi.delta_error_table", + "platform": "urn:li:dataPlatform:databricks", + "version": 0, + "created": { + "time": 0, + "actor": "urn:li:corpuser:unknown" + }, + "lastModified": { + "time": 0, + "actor": "urn:li:corpuser:unknown" + }, + "hash": "", + "platformSchema": { + "com.linkedin.schema.MySqlDDL": { + "tableSchema": "" + } + }, + "fields": [] + } + }, + "systemMetadata": { + "lastObserved": 1638860400000, + "runId": "unity-catalog-test", + "lastRunId": "no-run-id-provided" + } +}, { "entityType": "container", "entityUrn": "urn:li:container:730e95cd0271453376b3c1d9623838d6", @@ -1433,6 +1517,24 @@ "lastRunId": "no-run-id-provided" } }, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:databricks,hive_metastore.bronze_kambi.delta_error_table,PROD)", + "changeType": "UPSERT", + "aspectName": "subTypes", + "aspect": { + "json": { + "typeNames": [ + "Table" + ] + } + }, + "systemMetadata": { + "lastObserved": 1638860400000, + "runId": "unity-catalog-test", + "lastRunId": "no-run-id-provided" + } +}, { "entityType": "container", "entityUrn": "urn:li:container:730e95cd0271453376b3c1d9623838d6", @@ -1465,6 +1567,31 @@ "lastRunId": "no-run-id-provided" } }, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:databricks,hive_metastore.bronze_kambi.delta_error_table,PROD)", + "changeType": "UPSERT", + "aspectName": "browsePathsV2", + "aspect": { + "json": { + "path": [ + { + "id": "urn:li:container:d91b261e5da1bf1434c6318b8c2ac586", + "urn": "urn:li:container:d91b261e5da1bf1434c6318b8c2ac586" + }, + { + "id": "urn:li:container:21058fb6993a790a4a43727021e52956", + "urn": "urn:li:container:21058fb6993a790a4a43727021e52956" + } + ] + } + }, + "systemMetadata": { + "lastObserved": 1638860400000, + "runId": "unity-catalog-test", + "lastRunId": "no-run-id-provided" + } +}, { "entityType": "container", "entityUrn": "urn:li:container:730e95cd0271453376b3c1d9623838d6", @@ -2402,6 +2529,22 @@ "lastRunId": "no-run-id-provided" } }, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:databricks,hive_metastore.bronze_kambi.delta_error_table,PROD)", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + }, + "systemMetadata": { + "lastObserved": 1638860400000, + "runId": "unity-catalog-test", + "lastRunId": "no-run-id-provided" + } +}, { "entityType": "dataset", "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:databricks,quickstart_catalog.quickstart_schema.quickstart_table_external,PROD)",