From 27f23ecdd5d3635ac32ed51a10a339ee3e4870b3 Mon Sep 17 00:00:00 2001 From: Andrew Sikowitz Date: Wed, 6 Dec 2023 13:59:23 -0500 Subject: [PATCH] feat(ingest/unity): GE Profiling (#8951) --- docs/how/updating-datahub.md | 4 + .../sources/databricks/unity-catalog_pre.md | 3 +- .../databricks/unity-catalog_recipe.yml | 52 ++++-- metadata-ingestion/setup.py | 5 +- .../ingestion/source/bigquery_v2/profiler.py | 2 +- .../ingestion/source/ge_data_profiler.py | 24 ++- .../ingestion/source/redshift/profile.py | 3 +- .../source/snowflake/snowflake_profiler.py | 3 +- .../source/sql/sql_generic_profiler.py | 25 +-- .../{profiler.py => analyze_profiler.py} | 6 +- .../datahub/ingestion/source/unity/config.py | 78 ++++++-- .../ingestion/source/unity/ge_profiler.py | 170 ++++++++++++++++++ .../datahub/ingestion/source/unity/report.py | 7 +- .../datahub/ingestion/source/unity/source.py | 49 +++-- .../mysql/mysql_mces_no_db_golden.json | 27 +-- .../mysql/mysql_mces_with_db_golden.json | 73 ++++---- .../mysql_table_row_count_estimate_only.json | 121 ++++--------- .../tests/unit/test_unity_catalog_config.py | 8 +- 18 files changed, 449 insertions(+), 211 deletions(-) rename metadata-ingestion/src/datahub/ingestion/source/unity/{profiler.py => analyze_profiler.py} (96%) create mode 100644 metadata-ingestion/src/datahub/ingestion/source/unity/ge_profiler.py diff --git a/docs/how/updating-datahub.md b/docs/how/updating-datahub.md index df179b0d0d2f7..94ab1b0611c33 100644 --- a/docs/how/updating-datahub.md +++ b/docs/how/updating-datahub.md @@ -12,6 +12,10 @@ This file documents any backwards-incompatible changes in DataHub and assists pe - #9257: The Python SDK urn types are now autogenerated. The new classes are largely backwards compatible with the previous, manually written classes, but many older methods are now deprecated in favor of a more uniform interface. The only breaking change is that the signature for the director constructor e.g. `TagUrn("tag", ["tag_name"])` is no longer supported, and the simpler `TagUrn("tag_name")` should be used instead. The canonical place to import the urn classes from is `datahub.metadata.urns.*`. Other import paths, like `datahub.utilities.urns.corpuser_urn.CorpuserUrn` are retained for backwards compatibility, but are considered deprecated. - #9286: The `DataHubRestEmitter.emit` method no longer returns anything. It previously returned a tuple of timestamps. +- #8951: A great expectations based profiler has been added for the Unity Catalog source. +To use the old profiler, set `method: analyze` under the `profiling` section in your recipe. +To use the new profiler, set `method: ge`. Profiling is disabled by default, so to enable it, +one of these methods must be specified. ### Potential Downtime diff --git a/metadata-ingestion/docs/sources/databricks/unity-catalog_pre.md b/metadata-ingestion/docs/sources/databricks/unity-catalog_pre.md index ae2883343d7e8..12540e1977f64 100644 --- a/metadata-ingestion/docs/sources/databricks/unity-catalog_pre.md +++ b/metadata-ingestion/docs/sources/databricks/unity-catalog_pre.md @@ -15,7 +15,8 @@ * [Privileges documentation](https://docs.databricks.com/data-governance/unity-catalog/manage-privileges/privileges.html) + To ingest your workspace's notebooks and respective lineage, your service principal must have `CAN_READ` privileges on the folders containing the notebooks you want to ingest: [guide](https://docs.databricks.com/en/security/auth-authz/access-control/workspace-acl.html#folder-permissions). + To `include_usage_statistics` (enabled by default), your service principal must have `CAN_MANAGE` permissions on any SQL Warehouses you want to ingest: [guide](https://docs.databricks.com/security/auth-authz/access-control/sql-endpoint-acl.html). - + To ingest `profiling` information with `call_analyze` (enabled by default), your service principal must have ownership or `MODIFY` privilege on any tables you want to profile. + + To ingest `profiling` information with `method: ge`, you need `SELECT` privileges on all profiled tables. + + To ingest `profiling` information with `method: analyze` and `call_analyze: true` (enabled by default), your service principal must have ownership or `MODIFY` privilege on any tables you want to profile. * Alternatively, you can run [ANALYZE TABLE](https://docs.databricks.com/sql/language-manual/sql-ref-syntax-aux-analyze-table.html) yourself on any tables you want to profile, then set `call_analyze` to `false`. You will still need `SELECT` privilege on those tables to fetch the results. - Check the starter recipe below and replace `workspace_url` and `token` with your information from the previous steps. diff --git a/metadata-ingestion/docs/sources/databricks/unity-catalog_recipe.yml b/metadata-ingestion/docs/sources/databricks/unity-catalog_recipe.yml index 7bc336d5f25fc..931552e7343d0 100644 --- a/metadata-ingestion/docs/sources/databricks/unity-catalog_recipe.yml +++ b/metadata-ingestion/docs/sources/databricks/unity-catalog_recipe.yml @@ -2,24 +2,38 @@ source: type: unity-catalog config: workspace_url: https://my-workspace.cloud.databricks.com - token: "mygenerated_databricks_token" - #metastore_id_pattern: - # deny: - # - 11111-2222-33333-44-555555 - #catalog_pattern: - # allow: - # - my-catalog - #schema_pattern: - # deny: - # - information_schema - #table_pattern: - # allow: - # - test.lineagedemo.dinner - # First you have to create domains on Datahub by following this guide -> https://datahubproject.io/docs/domains/#domains-setup-prerequisites-and-permissions - #domain: - # urn:li:domain:1111-222-333-444-555: - # allow: - # - main.* + token: "" + include_metastore: false + include_ownership: true + profiling: + method: "ge" + enabled: true + warehouse_id: "" + profile_table_level_only: false + max_wait_secs: 60 + pattern: + deny: + - ".*\\.unwanted_schema" + +# profiling: +# method: "analyze" +# enabled: true +# warehouse_id: "" +# profile_table_level_only: true +# call_analyze: true + +# catalogs: ["my_catalog"] +# schema_pattern: +# deny: +# - information_schema +# table_pattern: +# allow: +# - my_catalog.my_schema.my_table +# First you have to create domains on Datahub by following this guide -> https://datahubproject.io/docs/domains/#domains-setup-prerequisites-and-permissions +# domain: +# urn:li:domain:1111-222-333-444-555: +# allow: +# - main.* stateful_ingestion: enabled: true @@ -27,4 +41,4 @@ source: pipeline_name: acme-corp-unity -# sink configs if needed \ No newline at end of file +# sink configs if needed diff --git a/metadata-ingestion/setup.py b/metadata-ingestion/setup.py index 69cbe8d823450..dac865d2dac37 100644 --- a/metadata-ingestion/setup.py +++ b/metadata-ingestion/setup.py @@ -262,7 +262,8 @@ "databricks-sdk>=0.9.0", "pyspark~=3.3.0", "requests", - "databricks-sql-connector", + # Version 2.4.0 includes sqlalchemy dialect, 2.8.0 includes some bug fixes + "databricks-sql-connector>=2.8.0", } mysql = sql_common | {"pymysql>=1.0.2"} @@ -393,7 +394,7 @@ "powerbi": microsoft_common | {"lark[regex]==1.1.4", "sqlparse"} | sqlglot_lib, "powerbi-report-server": powerbi_report_server, "vertica": sql_common | {"vertica-sqlalchemy-dialect[vertica-python]==0.0.8.1"}, - "unity-catalog": databricks | sqllineage_lib, + "unity-catalog": databricks | sql_common | sqllineage_lib, "fivetran": snowflake_common, } diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/profiler.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/profiler.py index 8ae17600e0eea..4083eb6db77c1 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/profiler.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/profiler.py @@ -183,7 +183,7 @@ def get_workunits( return yield from self.generate_profile_workunits( profile_requests, - self.config.profiling.max_workers, + max_workers=self.config.profiling.max_workers, platform=self.platform, profiler_args=self.get_profile_args(), ) diff --git a/metadata-ingestion/src/datahub/ingestion/source/ge_data_profiler.py b/metadata-ingestion/src/datahub/ingestion/source/ge_data_profiler.py index c334a97680e3e..abb415c90cc8b 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/ge_data_profiler.py +++ b/metadata-ingestion/src/datahub/ingestion/source/ge_data_profiler.py @@ -27,6 +27,7 @@ import sqlalchemy as sa import sqlalchemy.sql.compiler +from great_expectations.core.profiler_types_mapping import ProfilerTypeMapping from great_expectations.core.util import convert_to_json_serializable from great_expectations.data_context import AbstractDataContext, BaseDataContext from great_expectations.data_context.types.base import ( @@ -77,8 +78,26 @@ SNOWFLAKE = "snowflake" BIGQUERY = "bigquery" REDSHIFT = "redshift" +DATABRICKS = "databricks" TRINO = "trino" +# Type names for Databricks, to match Title Case types in sqlalchemy +ProfilerTypeMapping.INT_TYPE_NAMES.append("Integer") +ProfilerTypeMapping.INT_TYPE_NAMES.append("SmallInteger") +ProfilerTypeMapping.INT_TYPE_NAMES.append("BigInteger") +ProfilerTypeMapping.FLOAT_TYPE_NAMES.append("Float") +ProfilerTypeMapping.FLOAT_TYPE_NAMES.append("Numeric") +ProfilerTypeMapping.STRING_TYPE_NAMES.append("String") +ProfilerTypeMapping.STRING_TYPE_NAMES.append("Text") +ProfilerTypeMapping.STRING_TYPE_NAMES.append("Unicode") +ProfilerTypeMapping.STRING_TYPE_NAMES.append("UnicodeText") +ProfilerTypeMapping.BOOLEAN_TYPE_NAMES.append("Boolean") +ProfilerTypeMapping.DATETIME_TYPE_NAMES.append("Date") +ProfilerTypeMapping.DATETIME_TYPE_NAMES.append("DateTime") +ProfilerTypeMapping.DATETIME_TYPE_NAMES.append("Time") +ProfilerTypeMapping.DATETIME_TYPE_NAMES.append("Interval") +ProfilerTypeMapping.BINARY_TYPE_NAMES.append("LargeBinary") + # The reason for this wacky structure is quite fun. GE basically assumes that # the config structures were generated directly from YML and further assumes that # they can be `deepcopy`'d without issue. The SQLAlchemy engine and connection @@ -697,6 +716,9 @@ def generate_dataset_profile( # noqa: C901 (complexity) 1, unique_count / non_null_count ) + if not profile.rowCount: + continue + self._get_dataset_column_sample_values(column_profile, column) if ( @@ -1172,7 +1194,7 @@ def _get_ge_dataset( }, ) - if platform == BIGQUERY: + if platform == BIGQUERY or platform == DATABRICKS: # This is done as GE makes the name as DATASET.TABLE # but we want it to be PROJECT.DATASET.TABLE instead for multi-project setups name_parts = pretty_name.split(".") diff --git a/metadata-ingestion/src/datahub/ingestion/source/redshift/profile.py b/metadata-ingestion/src/datahub/ingestion/source/redshift/profile.py index 771636e8498a3..6fa3504ced139 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/redshift/profile.py +++ b/metadata-ingestion/src/datahub/ingestion/source/redshift/profile.py @@ -59,8 +59,7 @@ def get_workunits( yield from self.generate_profile_workunits( profile_requests, - self.config.profiling.max_workers, - db, + max_workers=self.config.profiling.max_workers, platform=self.platform, profiler_args=self.get_profile_args(), ) diff --git a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_profiler.py b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_profiler.py index 8e18d85d6f3ca..67953de47e5a3 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_profiler.py +++ b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_profiler.py @@ -62,8 +62,7 @@ def get_workunits( yield from self.generate_profile_workunits( profile_requests, - self.config.profiling.max_workers, - database.name, + max_workers=self.config.profiling.max_workers, platform=self.platform, profiler_args=self.get_profile_args(), ) diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/sql_generic_profiler.py b/metadata-ingestion/src/datahub/ingestion/source/sql/sql_generic_profiler.py index aaeee5717a867..e309ff0d15311 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/sql_generic_profiler.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/sql_generic_profiler.py @@ -69,8 +69,8 @@ def __init__( def generate_profile_workunits( self, requests: List[TableProfilerRequest], + *, max_workers: int, - db_name: Optional[str] = None, platform: Optional[str] = None, profiler_args: Optional[Dict] = None, ) -> Iterable[MetadataWorkUnit]: @@ -98,7 +98,7 @@ def generate_profile_workunits( return # Otherwise, if column level profiling is enabled, use GE profiler. - ge_profiler = self.get_profiler_instance(db_name) + ge_profiler = self.get_profiler_instance() for ge_profiler_request, profile in ge_profiler.generate_profiles( ge_profile_requests, max_workers, platform, profiler_args @@ -149,12 +149,18 @@ def get_profile_request( profile_table_level_only = self.config.profiling.profile_table_level_only dataset_name = self.get_dataset_name(table.name, schema_name, db_name) if not self.is_dataset_eligible_for_profiling( - dataset_name, table.last_altered, table.size_in_bytes, table.rows_count + dataset_name, + last_altered=table.last_altered, + size_in_bytes=table.size_in_bytes, + rows_count=table.rows_count, ): # Profile only table level if dataset is filtered from profiling # due to size limits alone if self.is_dataset_eligible_for_profiling( - dataset_name, table.last_altered, 0, 0 + dataset_name, + last_altered=table.last_altered, + size_in_bytes=None, + rows_count=None, ): profile_table_level_only = True else: @@ -199,9 +205,7 @@ def get_inspectors(self) -> Iterable[Inspector]: inspector = inspect(conn) yield inspector - def get_profiler_instance( - self, db_name: Optional[str] = None - ) -> "DatahubGEProfiler": + def get_profiler_instance(self) -> "DatahubGEProfiler": logger.debug(f"Getting profiler instance from {self.platform}") url = self.config.get_sql_alchemy_url() @@ -221,9 +225,10 @@ def get_profiler_instance( def is_dataset_eligible_for_profiling( self, dataset_name: str, - last_altered: Optional[datetime], - size_in_bytes: Optional[int], - rows_count: Optional[int], + *, + last_altered: Optional[datetime] = None, + size_in_bytes: Optional[int] = None, + rows_count: Optional[int] = None, ) -> bool: dataset_urn = make_dataset_urn_with_platform_instance( self.platform, diff --git a/metadata-ingestion/src/datahub/ingestion/source/unity/profiler.py b/metadata-ingestion/src/datahub/ingestion/source/unity/analyze_profiler.py similarity index 96% rename from metadata-ingestion/src/datahub/ingestion/source/unity/profiler.py rename to metadata-ingestion/src/datahub/ingestion/source/unity/analyze_profiler.py index 8066932e3afe9..4c8b22f2399b2 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/unity/profiler.py +++ b/metadata-ingestion/src/datahub/ingestion/source/unity/analyze_profiler.py @@ -6,7 +6,7 @@ from datahub.emitter.mcp import MetadataChangeProposalWrapper from datahub.ingestion.api.workunit import MetadataWorkUnit -from datahub.ingestion.source.unity.config import UnityCatalogProfilerConfig +from datahub.ingestion.source.unity.config import UnityCatalogAnalyzeProfilerConfig from datahub.ingestion.source.unity.proxy import UnityCatalogApiProxy from datahub.ingestion.source.unity.proxy_types import ( ColumnProfile, @@ -23,8 +23,8 @@ @dataclass -class UnityCatalogProfiler: - config: UnityCatalogProfilerConfig +class UnityCatalogAnalyzeProfiler: + config: UnityCatalogAnalyzeProfilerConfig report: UnityCatalogReport proxy: UnityCatalogApiProxy dataset_urn_builder: Callable[[TableReference], str] diff --git a/metadata-ingestion/src/datahub/ingestion/source/unity/config.py b/metadata-ingestion/src/datahub/ingestion/source/unity/config.py index 4e3deedddbc43..2c567120b4850 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/unity/config.py +++ b/metadata-ingestion/src/datahub/ingestion/source/unity/config.py @@ -1,10 +1,12 @@ import logging import os from datetime import datetime, timedelta, timezone -from typing import Any, Dict, List, Optional +from typing import Any, Dict, List, Optional, Union +from urllib.parse import urlparse import pydantic from pydantic import Field +from typing_extensions import Literal from datahub.configuration.common import AllowDenyPattern, ConfigModel from datahub.configuration.source_common import ( @@ -13,6 +15,9 @@ ) from datahub.configuration.validate_field_removal import pydantic_removed_field from datahub.configuration.validate_field_rename import pydantic_renamed_field +from datahub.ingestion.source.ge_data_profiler import DATABRICKS +from datahub.ingestion.source.ge_profiling_config import GEProfilingConfig +from datahub.ingestion.source.sql.sql_config import SQLCommonConfig, make_sqlalchemy_uri from datahub.ingestion.source.state.stale_entity_removal_handler import ( StatefulStaleMetadataRemovalConfig, ) @@ -31,24 +36,20 @@ class UnityCatalogProfilerConfig(ConfigModel): - # TODO: Reduce duplicate code with DataLakeProfilerConfig, GEProfilingConfig, SQLAlchemyConfig - enabled: bool = Field( - default=False, description="Whether profiling should be done." - ) - operation_config: OperationConfig = Field( - default_factory=OperationConfig, - description="Experimental feature. To specify operation configs.", + method: str = Field( + description=( + "Profiling method to use." + " Options supported are `ge` and `analyze`." + " `ge` uses Great Expectations and runs SELECT SQL queries on profiled tables." + " `analyze` calls ANALYZE TABLE on profiled tables. Only works for delta tables." + ), ) + # TODO: Support cluster compute as well, for ge profiling warehouse_id: Optional[str] = Field( default=None, description="SQL Warehouse id, for running profiling queries." ) - profile_table_level_only: bool = Field( - default=False, - description="Whether to perform profiling at table-level only or include column-level profiling as well.", - ) - pattern: AllowDenyPattern = Field( default=AllowDenyPattern.allow_all(), description=( @@ -58,6 +59,24 @@ class UnityCatalogProfilerConfig(ConfigModel): ), ) + +class UnityCatalogAnalyzeProfilerConfig(UnityCatalogProfilerConfig): + method: Literal["analyze"] = "analyze" + + # TODO: Reduce duplicate code with DataLakeProfilerConfig, GEProfilingConfig, SQLAlchemyConfig + enabled: bool = Field( + default=False, description="Whether profiling should be done." + ) + operation_config: OperationConfig = Field( + default_factory=OperationConfig, + description="Experimental feature. To specify operation configs.", + ) + + profile_table_level_only: bool = Field( + default=False, + description="Whether to perform profiling at table-level only or include column-level profiling as well.", + ) + call_analyze: bool = Field( default=True, description=( @@ -89,7 +108,17 @@ def include_columns(self): return not self.profile_table_level_only +class UnityCatalogGEProfilerConfig(UnityCatalogProfilerConfig, GEProfilingConfig): + method: Literal["ge"] = "ge" + + max_wait_secs: Optional[int] = Field( + default=None, + description="Maximum time to wait for a table to be profiled.", + ) + + class UnityCatalogSourceConfig( + SQLCommonConfig, StatefulIngestionConfigBase, BaseUsageConfig, DatasetSourceConfigMixin, @@ -217,15 +246,34 @@ class UnityCatalogSourceConfig( description="Generate usage statistics.", ) - profiling: UnityCatalogProfilerConfig = Field( - default=UnityCatalogProfilerConfig(), description="Data profiling configuration" + profiling: Union[UnityCatalogGEProfilerConfig, UnityCatalogAnalyzeProfilerConfig] = Field( # type: ignore + default=UnityCatalogGEProfilerConfig(), + description="Data profiling configuration", + discriminator="method", ) + scheme: str = DATABRICKS + + def get_sql_alchemy_url(self): + return make_sqlalchemy_uri( + scheme=self.scheme, + username="token", + password=self.token, + at=urlparse(self.workspace_url).netloc, + db=None, + uri_opts={ + "http_path": f"/sql/1.0/warehouses/{self.profiling.warehouse_id}" + }, + ) + def is_profiling_enabled(self) -> bool: return self.profiling.enabled and is_profiling_enabled( self.profiling.operation_config ) + def is_ge_profiling(self) -> bool: + return self.profiling.method == "ge" + stateful_ingestion: Optional[StatefulStaleMetadataRemovalConfig] = pydantic.Field( default=None, description="Unity Catalog Stateful Ingestion Config." ) diff --git a/metadata-ingestion/src/datahub/ingestion/source/unity/ge_profiler.py b/metadata-ingestion/src/datahub/ingestion/source/unity/ge_profiler.py new file mode 100644 index 0000000000000..e24ca8330777e --- /dev/null +++ b/metadata-ingestion/src/datahub/ingestion/source/unity/ge_profiler.py @@ -0,0 +1,170 @@ +import logging +from concurrent.futures import ThreadPoolExecutor, as_completed +from dataclasses import dataclass, field +from typing import Iterable, List, Optional + +from sqlalchemy import create_engine +from sqlalchemy.engine import Connection + +from datahub.ingestion.api.workunit import MetadataWorkUnit +from datahub.ingestion.source.sql.sql_config import SQLCommonConfig +from datahub.ingestion.source.sql.sql_generic import BaseTable +from datahub.ingestion.source.sql.sql_generic_profiler import ( + GenericProfiler, + TableProfilerRequest, +) +from datahub.ingestion.source.unity.config import UnityCatalogGEProfilerConfig +from datahub.ingestion.source.unity.proxy_types import Table, TableReference +from datahub.ingestion.source.unity.report import UnityCatalogReport + +logger = logging.getLogger(__name__) + + +@dataclass(init=False) +class UnityCatalogSQLGenericTable(BaseTable): + ref: TableReference = field(init=False) + + def __init__(self, table: Table): + self.name = table.name + self.comment = table.comment + self.created = table.created_at + self.last_altered = table.updated_at + self.column_count = len(table.columns) + self.ref = table.ref + self.size_in_bytes = None + self.rows_count = None + self.ddl = None + + +class UnityCatalogGEProfiler(GenericProfiler): + sql_common_config: SQLCommonConfig + profiling_config: UnityCatalogGEProfilerConfig + report: UnityCatalogReport + + def __init__( + self, + sql_common_config: SQLCommonConfig, + profiling_config: UnityCatalogGEProfilerConfig, + report: UnityCatalogReport, + ) -> None: + super().__init__(sql_common_config, report, "databricks") + self.profiling_config = profiling_config + # TODO: Consider passing dataset urn builder directly + # So there is no repeated logic between this class and source.py + + def get_workunits(self, tables: List[Table]) -> Iterable[MetadataWorkUnit]: + # Extra default SQLAlchemy option for better connection pooling and threading. + # https://docs.sqlalchemy.org/en/14/core/pooling.html#sqlalchemy.pool.QueuePool.params.max_overflow + self.config.options.setdefault( + "max_overflow", self.profiling_config.max_workers + ) + + url = self.config.get_sql_alchemy_url() + engine = create_engine(url, **self.config.options) + conn = engine.connect() + + profile_requests = [] + with ThreadPoolExecutor( + max_workers=self.profiling_config.max_workers + ) as executor: + futures = [ + executor.submit( + self.get_unity_profile_request, + UnityCatalogSQLGenericTable(table), + conn, + ) + for table in tables + ] + + try: + for i, completed in enumerate( + as_completed(futures, timeout=self.profiling_config.max_wait_secs) + ): + profile_request = completed.result() + if profile_request is not None: + profile_requests.append(profile_request) + if i > 0 and i % 100 == 0: + logger.info(f"Finished table-level profiling for {i} tables") + except TimeoutError: + logger.warning("Timed out waiting to complete table-level profiling.") + + if len(profile_requests) == 0: + return + + yield from self.generate_profile_workunits( + profile_requests, + max_workers=self.config.profiling.max_workers, + platform=self.platform, + profiler_args=self.get_profile_args(), + ) + + def get_dataset_name(self, table_name: str, schema_name: str, db_name: str) -> str: + # Note: unused... ideally should share logic with TableReference + return f"{db_name}.{schema_name}.{table_name}" + + def get_unity_profile_request( + self, table: UnityCatalogSQLGenericTable, conn: Connection + ) -> Optional[TableProfilerRequest]: + # TODO: Reduce code duplication with get_profile_request + skip_profiling = False + profile_table_level_only = self.profiling_config.profile_table_level_only + + dataset_name = table.ref.qualified_table_name + try: + table.size_in_bytes = _get_dataset_size_in_bytes(table, conn) + except Exception as e: + logger.warning(f"Failed to get table size for {dataset_name}: {e}") + + if table.size_in_bytes is None: + self.report.num_profile_missing_size_in_bytes += 1 + if not self.is_dataset_eligible_for_profiling( + dataset_name, + size_in_bytes=table.size_in_bytes, + last_altered=table.last_altered, + rows_count=0, # Can't get row count ahead of time + ): + # Profile only table level if dataset is filtered from profiling + # due to size limits alone + if self.is_dataset_eligible_for_profiling( + dataset_name, + last_altered=table.last_altered, + size_in_bytes=None, + rows_count=None, + ): + profile_table_level_only = True + else: + skip_profiling = True + + if table.column_count == 0: + skip_profiling = True + + if skip_profiling: + if self.profiling_config.report_dropped_profiles: + self.report.report_dropped(dataset_name) + return None + + self.report.report_entity_profiled(dataset_name) + logger.debug(f"Preparing profiling request for {dataset_name}") + return TableProfilerRequest( + table=table, + pretty_name=dataset_name, + batch_kwargs=dict(schema=table.ref.schema, table=table.name), + profile_table_level_only=profile_table_level_only, + ) + + +def _get_dataset_size_in_bytes( + table: UnityCatalogSQLGenericTable, conn: Connection +) -> Optional[int]: + name = ".".join( + conn.dialect.identifier_preparer.quote(c) + for c in [table.ref.catalog, table.ref.schema, table.ref.table] + ) + row = conn.execute(f"DESCRIBE DETAIL {name}").fetchone() + if row is None: + return None + else: + try: + return int(row._asdict()["sizeInBytes"]) + except Exception: + return None diff --git a/metadata-ingestion/src/datahub/ingestion/source/unity/report.py b/metadata-ingestion/src/datahub/ingestion/source/unity/report.py index 4153d9dd88eb8..7f19b6e2103ea 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/unity/report.py +++ b/metadata-ingestion/src/datahub/ingestion/source/unity/report.py @@ -2,15 +2,13 @@ from typing import Tuple from datahub.ingestion.api.report import EntityFilterReport -from datahub.ingestion.source.state.stale_entity_removal_handler import ( - StaleEntityRemovalSourceReport, -) +from datahub.ingestion.source.sql.sql_generic_profiler import ProfilingSqlReport from datahub.ingestion.source_report.ingestion_stage import IngestionStageReport from datahub.utilities.lossy_collections import LossyDict, LossyList @dataclass -class UnityCatalogReport(IngestionStageReport, StaleEntityRemovalSourceReport): +class UnityCatalogReport(IngestionStageReport, ProfilingSqlReport): metastores: EntityFilterReport = EntityFilterReport.field(type="metastore") catalogs: EntityFilterReport = EntityFilterReport.field(type="catalog") schemas: EntityFilterReport = EntityFilterReport.field(type="schema") @@ -36,5 +34,6 @@ class UnityCatalogReport(IngestionStageReport, StaleEntityRemovalSourceReport): profile_table_errors: LossyDict[str, LossyList[Tuple[str, str]]] = field( default_factory=LossyDict ) + num_profile_missing_size_in_bytes: int = 0 num_profile_failed_unsupported_column_type: int = 0 num_profile_failed_int_casts: int = 0 diff --git a/metadata-ingestion/src/datahub/ingestion/source/unity/source.py b/metadata-ingestion/src/datahub/ingestion/source/unity/source.py index 44b5bbbcb0ceb..03b4f61a512d0 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/unity/source.py +++ b/metadata-ingestion/src/datahub/ingestion/source/unity/source.py @@ -2,7 +2,6 @@ import re import time from concurrent.futures import ThreadPoolExecutor -from datetime import timedelta from typing import Dict, Iterable, List, Optional, Set, Union from urllib.parse import urljoin @@ -52,9 +51,14 @@ from datahub.ingestion.source.state.stateful_ingestion_base import ( StatefulIngestionSourceBase, ) -from datahub.ingestion.source.unity.config import UnityCatalogSourceConfig +from datahub.ingestion.source.unity.analyze_profiler import UnityCatalogAnalyzeProfiler +from datahub.ingestion.source.unity.config import ( + UnityCatalogAnalyzeProfilerConfig, + UnityCatalogGEProfilerConfig, + UnityCatalogSourceConfig, +) from datahub.ingestion.source.unity.connection_test import UnityCatalogConnectionTest -from datahub.ingestion.source.unity.profiler import UnityCatalogProfiler +from datahub.ingestion.source.unity.ge_profiler import UnityCatalogGEProfiler from datahub.ingestion.source.unity.proxy import UnityCatalogApiProxy from datahub.ingestion.source.unity.proxy_types import ( DATA_TYPE_REGISTRY, @@ -170,6 +174,9 @@ def __init__(self, ctx: PipelineContext, config: UnityCatalogSourceConfig): self.view_refs: Set[TableReference] = set() self.notebooks: FileBackedDict[Notebook] = FileBackedDict() + # Global map of tables, for profiling + self.tables: FileBackedDict[Table] = FileBackedDict() + @staticmethod def test_connection(config_dict: dict) -> TestConnectionReport: return UnityCatalogConnectionTest(config_dict).get_connection_test() @@ -233,16 +240,24 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]: if self.config.is_profiling_enabled(): self.report.report_ingestion_stage_start("Wait on warehouse") assert wait_on_warehouse - timeout = timedelta(seconds=self.config.profiling.max_wait_secs) - wait_on_warehouse.result(timeout) - profiling_extractor = UnityCatalogProfiler( - self.config.profiling, - self.report, - self.unity_catalog_api_proxy, - self.gen_dataset_urn, - ) + wait_on_warehouse.result() + self.report.report_ingestion_stage_start("Profiling") - yield from profiling_extractor.get_workunits(self.table_refs) + if isinstance(self.config.profiling, UnityCatalogAnalyzeProfilerConfig): + yield from UnityCatalogAnalyzeProfiler( + self.config.profiling, + self.report, + self.unity_catalog_api_proxy, + self.gen_dataset_urn, + ).get_workunits(self.table_refs) + elif isinstance(self.config.profiling, UnityCatalogGEProfilerConfig): + yield from UnityCatalogGEProfiler( + sql_common_config=self.config, + profiling_config=self.config.profiling, + report=self.report, + ).get_workunits(list(self.tables.values())) + else: + raise ValueError("Unknown profiling config method") def build_service_principal_map(self) -> None: try: @@ -358,6 +373,16 @@ def process_tables(self, schema: Schema) -> Iterable[MetadataWorkUnit]: self.report.tables.dropped(table.id, f"table ({table.table_type})") continue + if ( + self.config.is_profiling_enabled() + and self.config.is_ge_profiling() + and self.config.profiling.pattern.allowed( + table.ref.qualified_table_name + ) + and not table.is_view + ): + self.tables[table.ref.qualified_table_name] = table + if table.is_view: self.view_refs.add(table.ref) else: diff --git a/metadata-ingestion/tests/integration/mysql/mysql_mces_no_db_golden.json b/metadata-ingestion/tests/integration/mysql/mysql_mces_no_db_golden.json index 38b03ce238d1c..a86ed53406e40 100644 --- a/metadata-ingestion/tests/integration/mysql/mysql_mces_no_db_golden.json +++ b/metadata-ingestion/tests/integration/mysql/mysql_mces_no_db_golden.json @@ -2254,30 +2254,17 @@ { "fieldPath": "id", "uniqueCount": 0, - "nullCount": 0, - "min": "None", - "max": "None", - "mean": "None", - "median": "None", - "stdev": "0.0", - "sampleValues": [] + "nullCount": 0 }, { "fieldPath": "description", "uniqueCount": 0, - "nullCount": 0, - "sampleValues": [] + "nullCount": 0 }, { "fieldPath": "customer_id", "uniqueCount": 0, - "nullCount": 0, - "min": "None", - "max": "None", - "mean": "None", - "median": "None", - "stdev": "0.0", - "sampleValues": [] + "nullCount": 0 } ] } @@ -2625,8 +2612,7 @@ { "fieldPath": "col", "uniqueCount": 0, - "nullCount": 0, - "sampleValues": [] + "nullCount": 0 } ] } @@ -2655,8 +2641,7 @@ { "fieldPath": "dummy", "uniqueCount": 0, - "nullCount": 0, - "sampleValues": [] + "nullCount": 0 } ] } @@ -2738,4 +2723,4 @@ "lastRunId": "no-run-id-provided" } } -] \ No newline at end of file +] diff --git a/metadata-ingestion/tests/integration/mysql/mysql_mces_with_db_golden.json b/metadata-ingestion/tests/integration/mysql/mysql_mces_with_db_golden.json index 5cfba57247bd3..b5ebca424d9a2 100644 --- a/metadata-ingestion/tests/integration/mysql/mysql_mces_with_db_golden.json +++ b/metadata-ingestion/tests/integration/mysql/mysql_mces_with_db_golden.json @@ -16,7 +16,8 @@ }, "systemMetadata": { "lastObserved": 1586847600000, - "runId": "mysql-test" + "runId": "mysql-test", + "lastRunId": "no-run-id-provided" } }, { @@ -31,7 +32,8 @@ }, "systemMetadata": { "lastObserved": 1586847600000, - "runId": "mysql-test" + "runId": "mysql-test", + "lastRunId": "no-run-id-provided" } }, { @@ -46,7 +48,8 @@ }, "systemMetadata": { "lastObserved": 1586847600000, - "runId": "mysql-test" + "runId": "mysql-test", + "lastRunId": "no-run-id-provided" } }, { @@ -63,7 +66,8 @@ }, "systemMetadata": { "lastObserved": 1586847600000, - "runId": "mysql-test" + "runId": "mysql-test", + "lastRunId": "no-run-id-provided" } }, { @@ -80,7 +84,8 @@ }, "systemMetadata": { "lastObserved": 1586847600000, - "runId": "mysql-test" + "runId": "mysql-test", + "lastRunId": "no-run-id-provided" } }, { @@ -95,7 +100,8 @@ }, "systemMetadata": { "lastObserved": 1586847600000, - "runId": "mysql-test" + "runId": "mysql-test", + "lastRunId": "no-run-id-provided" } }, { @@ -110,7 +116,8 @@ }, "systemMetadata": { "lastObserved": 1586847600000, - "runId": "mysql-test" + "runId": "mysql-test", + "lastRunId": "no-run-id-provided" } }, { @@ -230,7 +237,8 @@ }, "systemMetadata": { "lastObserved": 1586847600000, - "runId": "mysql-test" + "runId": "mysql-test", + "lastRunId": "no-run-id-provided" } }, { @@ -247,7 +255,8 @@ }, "systemMetadata": { "lastObserved": 1586847600000, - "runId": "mysql-test" + "runId": "mysql-test", + "lastRunId": "no-run-id-provided" } }, { @@ -264,7 +273,8 @@ }, "systemMetadata": { "lastObserved": 1586847600000, - "runId": "mysql-test" + "runId": "mysql-test", + "lastRunId": "no-run-id-provided" } }, { @@ -284,7 +294,8 @@ }, "systemMetadata": { "lastObserved": 1586847600000, - "runId": "mysql-test" + "runId": "mysql-test", + "lastRunId": "no-run-id-provided" } }, { @@ -299,7 +310,8 @@ }, "systemMetadata": { "lastObserved": 1586847600000, - "runId": "mysql-test" + "runId": "mysql-test", + "lastRunId": "no-run-id-provided" } }, { @@ -395,7 +407,8 @@ }, "systemMetadata": { "lastObserved": 1586847600000, - "runId": "mysql-test" + "runId": "mysql-test", + "lastRunId": "no-run-id-provided" } }, { @@ -412,7 +425,8 @@ }, "systemMetadata": { "lastObserved": 1586847600000, - "runId": "mysql-test" + "runId": "mysql-test", + "lastRunId": "no-run-id-provided" } }, { @@ -429,7 +443,8 @@ }, "systemMetadata": { "lastObserved": 1586847600000, - "runId": "mysql-test" + "runId": "mysql-test", + "lastRunId": "no-run-id-provided" } }, { @@ -449,7 +464,8 @@ }, "systemMetadata": { "lastObserved": 1586847600000, - "runId": "mysql-test" + "runId": "mysql-test", + "lastRunId": "no-run-id-provided" } }, { @@ -572,7 +588,8 @@ }, "systemMetadata": { "lastObserved": 1586847600000, - "runId": "mysql-test" + "runId": "mysql-test", + "lastRunId": "no-run-id-provided" } }, { @@ -593,37 +610,25 @@ { "fieldPath": "id", "uniqueCount": 0, - "nullCount": 0, - "min": "None", - "max": "None", - "mean": "None", - "median": "None", - "stdev": "0.0", - "sampleValues": [] + "nullCount": 0 }, { "fieldPath": "description", "uniqueCount": 0, - "nullCount": 0, - "sampleValues": [] + "nullCount": 0 }, { "fieldPath": "customer_id", "uniqueCount": 0, - "nullCount": 0, - "min": "None", - "max": "None", - "mean": "None", - "median": "None", - "stdev": "0.0", - "sampleValues": [] + "nullCount": 0 } ] } }, "systemMetadata": { "lastObserved": 1586847600000, - "runId": "mysql-test" + "runId": "mysql-test", + "lastRunId": "no-run-id-provided" } } ] \ No newline at end of file diff --git a/metadata-ingestion/tests/integration/mysql/mysql_table_row_count_estimate_only.json b/metadata-ingestion/tests/integration/mysql/mysql_table_row_count_estimate_only.json index 7597013bd873a..634e04984986d 100644 --- a/metadata-ingestion/tests/integration/mysql/mysql_table_row_count_estimate_only.json +++ b/metadata-ingestion/tests/integration/mysql/mysql_table_row_count_estimate_only.json @@ -16,7 +16,8 @@ }, "systemMetadata": { "lastObserved": 1586847600000, - "runId": "mysql-2020_04_14-07_00_00" + "runId": "mysql-2020_04_14-07_00_00", + "lastRunId": "no-run-id-provided" } }, { @@ -31,7 +32,8 @@ }, "systemMetadata": { "lastObserved": 1586847600000, - "runId": "mysql-2020_04_14-07_00_00" + "runId": "mysql-2020_04_14-07_00_00", + "lastRunId": "no-run-id-provided" } }, { @@ -46,7 +48,8 @@ }, "systemMetadata": { "lastObserved": 1586847600000, - "runId": "mysql-2020_04_14-07_00_00" + "runId": "mysql-2020_04_14-07_00_00", + "lastRunId": "no-run-id-provided" } }, { @@ -63,7 +66,8 @@ }, "systemMetadata": { "lastObserved": 1586847600000, - "runId": "mysql-2020_04_14-07_00_00" + "runId": "mysql-2020_04_14-07_00_00", + "lastRunId": "no-run-id-provided" } }, { @@ -78,7 +82,8 @@ }, "systemMetadata": { "lastObserved": 1586847600000, - "runId": "mysql-2020_04_14-07_00_00" + "runId": "mysql-2020_04_14-07_00_00", + "lastRunId": "no-run-id-provided" } }, { @@ -93,7 +98,8 @@ }, "systemMetadata": { "lastObserved": 1586847600000, - "runId": "mysql-2020_04_14-07_00_00" + "runId": "mysql-2020_04_14-07_00_00", + "lastRunId": "no-run-id-provided" } }, { @@ -213,7 +219,8 @@ }, "systemMetadata": { "lastObserved": 1586847600000, - "runId": "mysql-2020_04_14-07_00_00" + "runId": "mysql-2020_04_14-07_00_00", + "lastRunId": "no-run-id-provided" } }, { @@ -230,7 +237,8 @@ }, "systemMetadata": { "lastObserved": 1586847600000, - "runId": "mysql-2020_04_14-07_00_00" + "runId": "mysql-2020_04_14-07_00_00", + "lastRunId": "no-run-id-provided" } }, { @@ -250,7 +258,8 @@ }, "systemMetadata": { "lastObserved": 1586847600000, - "runId": "mysql-2020_04_14-07_00_00" + "runId": "mysql-2020_04_14-07_00_00", + "lastRunId": "no-run-id-provided" } }, { @@ -265,7 +274,8 @@ }, "systemMetadata": { "lastObserved": 1586847600000, - "runId": "mysql-2020_04_14-07_00_00" + "runId": "mysql-2020_04_14-07_00_00", + "lastRunId": "no-run-id-provided" } }, { @@ -361,7 +371,8 @@ }, "systemMetadata": { "lastObserved": 1586847600000, - "runId": "mysql-2020_04_14-07_00_00" + "runId": "mysql-2020_04_14-07_00_00", + "lastRunId": "no-run-id-provided" } }, { @@ -378,7 +389,8 @@ }, "systemMetadata": { "lastObserved": 1586847600000, - "runId": "mysql-2020_04_14-07_00_00" + "runId": "mysql-2020_04_14-07_00_00", + "lastRunId": "no-run-id-provided" } }, { @@ -398,7 +410,8 @@ }, "systemMetadata": { "lastObserved": 1586847600000, - "runId": "mysql-2020_04_14-07_00_00" + "runId": "mysql-2020_04_14-07_00_00", + "lastRunId": "no-run-id-provided" } }, { @@ -420,88 +433,44 @@ "fieldPath": "id", "uniqueCount": 5, "uniqueProportion": 1, - "nullCount": 0, - "min": "1", - "max": "5", - "mean": "3.0", - "median": "3", - "stdev": "1.5811388300841898", - "sampleValues": [ - "1", - "2", - "3", - "4", - "5" - ] + "nullCount": 0 }, { "fieldPath": "company", "uniqueCount": 5, "uniqueProportion": 1, - "nullCount": 0, - "sampleValues": [ - "Company A", - "Company B", - "Company C", - "Company D", - "Company E" - ] + "nullCount": 0 }, { "fieldPath": "last_name", "uniqueCount": 5, "uniqueProportion": 1, - "nullCount": 0, - "sampleValues": [ - "Axen", - "Bedecs", - "Donnell", - "Gratacos Solsona", - "Lee" - ] + "nullCount": 0 }, { "fieldPath": "first_name", "uniqueCount": 5, "uniqueProportion": 1, - "nullCount": 0, - "sampleValues": [ - "Anna", - "Antonio", - "Christina", - "Martin", - "Thomas" - ] + "nullCount": 0 }, { "fieldPath": "email_address", "uniqueCount": 0, - "nullCount": 0, - "sampleValues": [] + "nullCount": 0 }, { "fieldPath": "priority", "uniqueCount": 3, "uniqueProportion": 0.75, - "nullCount": 0, - "min": "3.8", - "max": "4.9", - "mean": "4.175000011920929", - "median": "4.0", - "stdev": "0.49244294899530355", - "sampleValues": [ - "4.0", - "4.9", - "4.0", - "3.8" - ] + "nullCount": 0 } ] } }, "systemMetadata": { "lastObserved": 1586847600000, - "runId": "mysql-2020_04_14-07_00_00" + "runId": "mysql-2020_04_14-07_00_00", + "lastRunId": "no-run-id-provided" } }, { @@ -522,37 +491,25 @@ { "fieldPath": "id", "uniqueCount": 0, - "nullCount": 0, - "min": "None", - "max": "None", - "mean": "None", - "median": "None", - "stdev": "0.0", - "sampleValues": [] + "nullCount": 0 }, { "fieldPath": "description", "uniqueCount": 0, - "nullCount": 0, - "sampleValues": [] + "nullCount": 0 }, { "fieldPath": "customer_id", "uniqueCount": 0, - "nullCount": 0, - "min": "None", - "max": "None", - "mean": "None", - "median": "None", - "stdev": "0.0", - "sampleValues": [] + "nullCount": 0 } ] } }, "systemMetadata": { "lastObserved": 1586847600000, - "runId": "mysql-2020_04_14-07_00_00" + "runId": "mysql-2020_04_14-07_00_00", + "lastRunId": "no-run-id-provided" } } ] \ No newline at end of file diff --git a/metadata-ingestion/tests/unit/test_unity_catalog_config.py b/metadata-ingestion/tests/unit/test_unity_catalog_config.py index 4be6f60171844..4098ed4074de2 100644 --- a/metadata-ingestion/tests/unit/test_unity_catalog_config.py +++ b/metadata-ingestion/tests/unit/test_unity_catalog_config.py @@ -38,7 +38,11 @@ def test_profiling_requires_warehouses_id(): { "token": "token", "workspace_url": "https://workspace_url", - "profiling": {"enabled": True, "warehouse_id": "my_warehouse_id"}, + "profiling": { + "enabled": True, + "method": "ge", + "warehouse_id": "my_warehouse_id", + }, } ) assert config.profiling.enabled is True @@ -47,7 +51,7 @@ def test_profiling_requires_warehouses_id(): { "token": "token", "workspace_url": "https://workspace_url", - "profiling": {"enabled": False}, + "profiling": {"enabled": False, "method": "ge"}, } ) assert config.profiling.enabled is False