From 15c5a1a7c947d918ba5894f579a5ac0d9285fe96 Mon Sep 17 00:00:00 2001 From: Harshal Sheth Date: Fri, 5 Apr 2024 16:02:04 -0700 Subject: [PATCH] feat(ingest/sql): normalize bigquery partitioned tables when parsing --- .../source/bigquery_v2/bigquery_audit.py | 7 +++ ...est_bigquery_partitioned_table_insert.json | 51 +++++++++++++++++++ .../unit/sql_parsing/test_sqlglot_lineage.py | 17 +++++++ 3 files changed, 75 insertions(+) create mode 100644 metadata-ingestion/tests/unit/sql_parsing/goldens/test_bigquery_partitioned_table_insert.json diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_audit.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_audit.py index 8cef10ca23448..b7c13cf179fc4 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_audit.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_audit.py @@ -92,6 +92,7 @@ def get_table_display_name(self) -> str: - removes shard suffix (table_yyyymmdd-> table) - removes wildcard part (table_yyyy* -> table) - remove time decorator (table@1624046611000 -> table) + - removes partition ids (table$20210101 -> table or table$__UNPARTITIONED__ -> table) """ # if table name ends in _* or * or _yyyy* or _yyyymm* then we strip it as that represents a query on a sharded table shortened_table_name = re.sub(self._BIGQUERY_WILDCARD_REGEX, "", self.table) @@ -103,6 +104,12 @@ def get_table_display_name(self) -> str: f"Found table snapshot. Using {shortened_table_name} as the table name." ) + if "$" in shortened_table_name: + shortened_table_name = shortened_table_name.split("$", maxsplit=1)[0] + logger.debug( + f"Found partitioned table. Using {shortened_table_name} as the table name." + ) + table_name, _ = self.get_table_and_shard(shortened_table_name) return table_name or self.dataset diff --git a/metadata-ingestion/tests/unit/sql_parsing/goldens/test_bigquery_partitioned_table_insert.json b/metadata-ingestion/tests/unit/sql_parsing/goldens/test_bigquery_partitioned_table_insert.json new file mode 100644 index 0000000000000..9acd6ef32f288 --- /dev/null +++ b/metadata-ingestion/tests/unit/sql_parsing/goldens/test_bigquery_partitioned_table_insert.json @@ -0,0 +1,51 @@ +{ + "query_type": "SELECT", + "query_type_props": {}, + "query_fingerprint": "b4a1bdfa6e5518bb2a5bb75f20faf09b37379a284600708ef610b5cbd6a653d8", + "in_tables": [ + "urn:li:dataset:(urn:li:dataPlatform:bigquery,bq-proj.dataset.my-table,PROD)" + ], + "out_tables": [], + "column_lineage": [ + { + "downstream": { + "table": null, + "column": "col1", + "column_type": { + "type": { + "com.linkedin.pegasus2avro.schema.StringType": {} + } + }, + "native_column_type": "STRING" + }, + "upstreams": [ + { + "table": "urn:li:dataset:(urn:li:dataPlatform:bigquery,bq-proj.dataset.my-table,PROD)", + "column": "col1" + } + ] + }, + { + "downstream": { + "table": null, + "column": "col2", + "column_type": { + "type": { + "com.linkedin.pegasus2avro.schema.StringType": {} + } + }, + "native_column_type": "STRING" + }, + "upstreams": [ + { + "table": "urn:li:dataset:(urn:li:dataPlatform:bigquery,bq-proj.dataset.my-table,PROD)", + "column": "col2" + } + ] + } + ], + "debug_info": { + "confidence": 0.9, + "generalized_statement": "SELECT * FROM `bq-proj.dataset.my-table$__UNPARTITIONED__`" + } +} \ No newline at end of file diff --git a/metadata-ingestion/tests/unit/sql_parsing/test_sqlglot_lineage.py b/metadata-ingestion/tests/unit/sql_parsing/test_sqlglot_lineage.py index de329c6cfe239..c1d629bc6706e 100644 --- a/metadata-ingestion/tests/unit/sql_parsing/test_sqlglot_lineage.py +++ b/metadata-ingestion/tests/unit/sql_parsing/test_sqlglot_lineage.py @@ -529,6 +529,23 @@ def test_bigquery_from_sharded_table_wildcard(): ) +def test_bigquery_partitioned_table_insert(): + assert_sql_result( + """ +SELECT * +FROM `bq-proj.dataset.my-table$__UNPARTITIONED__` +""", + dialect="bigquery", + schemas={ + "urn:li:dataset:(urn:li:dataPlatform:bigquery,bq-proj.dataset.my-table,PROD)": { + "col1": "STRING", + "col2": "STRING", + }, + }, + expected_file=RESOURCE_DIR / "test_bigquery_partitioned_table_insert.json", + ) + + def test_bigquery_star_with_replace(): assert_sql_result( """