From 3d6c6e6cb4ebabe97de26866139d9bc78293a570 Mon Sep 17 00:00:00 2001 From: Vladislavs Gaidass Date: Sat, 23 Apr 2022 17:40:39 +0200 Subject: [PATCH] fix(bigquery): add handling of sql keywords 'from', 'with', 'admin', 'hour' --- .../utilities/sql_lineage_parser_impl.py | 96 ++++++--- .../tests/unit/test_utilities.py | 185 +++++++++++++++++- 2 files changed, 253 insertions(+), 28 deletions(-) diff --git a/metadata-ingestion/src/datahub/utilities/sql_lineage_parser_impl.py b/metadata-ingestion/src/datahub/utilities/sql_lineage_parser_impl.py index 15463b4d56ae36..c5961cfa8cd63f 100644 --- a/metadata-ingestion/src/datahub/utilities/sql_lineage_parser_impl.py +++ b/metadata-ingestion/src/datahub/utilities/sql_lineage_parser_impl.py @@ -5,6 +5,7 @@ from typing import Dict, List, Set from sqllineage.core.holders import Column, SQLLineageHolder +from sqllineage.exceptions import SQLLineageException try: import sqlparse @@ -20,8 +21,10 @@ class SqlLineageSQLParserImpl: _DATE_SWAP_TOKEN = "__d_a_t_e" + _HOUR_SWAP_TOKEN = "__h_o_u_r" _TIMESTAMP_SWAP_TOKEN = "__t_i_m_e_s_t_a_m_p" _DATA_SWAP_TOKEN = "__d_a_t_a" + _ADMIN_SWAP_TOKEN = "__a_d_m_i_n" _MYVIEW_SQL_TABLE_NAME_TOKEN = "__my_view__.__sql_table_name__" _MYVIEW_LOOKER_TOKEN = "my_view.SQL_TABLE_NAME" @@ -32,18 +35,59 @@ def __init__(self, sql_query: str) -> None: if "lateral flatten" in sql_query: sql_query = sql_query[: sql_query.find("lateral flatten")] + # BigQuery can use # as a comment symbol and that may break the parsing + # if the comment starts without a space sign after # + # and the comment is written inside SQL DDL statement (e.g., after CREATE...AS) + sql_query = re.sub(r"#([^ ])", r"# \1", sql_query, flags=re.IGNORECASE) + + # Wrap calls to field '.from' in ` so it would not be taken as a reserved keyword + sql_query = re.sub(r"(\w*\.from)", r"`\1`", sql_query, flags=re.IGNORECASE) + + # Apply sqlparser formatting to get rid of comments and reindent keywords + # which should remove some potential inconsistencies in parsing output + sql_query = sqlparse.format( + sql_query.strip(), + reindent_aligned=True, + strip_comments=True, + ) + + # SqlLineageParser does not allow table/view names not being wrapped in quotes or backticks + # Add ` signs before and after supposed object name that comes right after reserved word FROM + # note 1: this excludes date/time/datetime extract functions like EXTRACT DATE FROM... + # note 2: this includes adding ` signs to CTE aliases + sql_query = re.sub( + r"(? None: # Replace lookml templates with the variable otherwise sqlparse can't parse ${ sql_query = re.sub(r"(\${)(.+)(})", r"\2", sql_query) if sql_query != original_sql_query: - logger.debug(f"rewrote original query {original_sql_query} as {sql_query}") + logger.debug(f"Rewrote original query {original_sql_query} as {sql_query}") self._sql = sql_query - self._stmt = [ - s - for s in sqlparse.parse( - # first apply sqlparser formatting just to get rid of comments, which cause - # inconsistencies in parsing output - sqlparse.format( - self._sql.strip(), - strip_comments=True, - use_space_around_operators=True, - ), - ) - if s.token_first(skip_cm=True) - ] + try: + self._stmt = [ + s + for s in sqlparse.parse( + sqlparse.format( + self._sql.strip(), + use_space_around_operators=True, + ), + ) + if s.token_first(skip_cm=True) + ] - with unittest.mock.patch( - "sqllineage.core.handlers.source.SourceHandler.end_of_query_cleanup", - datahub.utilities.sqllineage_patch.end_of_query_cleanup_patch, - ): with unittest.mock.patch( - "sqllineage.core.holders.SubQueryLineageHolder.add_column_lineage", - datahub.utilities.sqllineage_patch.add_column_lineage_patch, + "sqllineage.core.handlers.source.SourceHandler.end_of_query_cleanup", + datahub.utilities.sqllineage_patch.end_of_query_cleanup_patch, ): - self._stmt_holders = [ - LineageAnalyzer().analyze(stmt) for stmt in self._stmt - ] - self._sql_holder = SQLLineageHolder.of(*self._stmt_holders) + with unittest.mock.patch( + "sqllineage.core.holders.SubQueryLineageHolder.add_column_lineage", + datahub.utilities.sqllineage_patch.add_column_lineage_patch, + ): + self._stmt_holders = [ + LineageAnalyzer().analyze(stmt) for stmt in self._stmt + ] + self._sql_holder = SQLLineageHolder.of(*self._stmt_holders) + except SQLLineageException as e: + logger.error(f"SQL lineage analyzer error '{e}' for query: '{self._sql}") def get_tables(self) -> List[str]: result: List[str] = list() diff --git a/metadata-ingestion/tests/unit/test_utilities.py b/metadata-ingestion/tests/unit/test_utilities.py index 491e81d3ac57d4..b2aed9f3088ce2 100644 --- a/metadata-ingestion/tests/unit/test_utilities.py +++ b/metadata-ingestion/tests/unit/test_utilities.py @@ -391,10 +391,191 @@ def test_with_keyword_data(): SELECT *, 'foo' AS bar - FROM `project.example_dataset.another_example_table_vw` + FROM `project.example_dataset.example_table` ) SELECT * FROM data """ ) - assert parser.get_tables() == ["project.example_dataset.another_example_table_vw"] + assert parser.get_tables() == ["project.example_dataset.example_table"] + + +def test_with_keyword_admin(): + parser = SqlLineageSQLParser( + sql_query=""" + WITH admin AS ( + SELECT * + FROM `project.example_dataset.example_table` + ) + SELECT * FROM admin + """ + ) + + assert parser.get_tables() == ["project.example_dataset.example_table"] + + +def test_sqllineage_sql_parser_create_or_replace_table_name_not_wrapped_in_backticks(): + parser = SqlLineageSQLParser( + sql_query=""" + CREATE OR REPLACE TABLE project.dataset.test_table AS + WITH cte AS ( + SELECT + *, + EXTRACT(MICROSECOND FROM time_field) AS col_1, + EXTRACT(MILLISECOND FROM time_field) AS col_2, + EXTRACT(SECOND FROM time_field) AS col_3, + EXTRACT(MINUTE FROM time_field) AS col_4, + EXTRACT(HOUR FROM time_field) AS col_5, + EXTRACT(DAYOFWEEK FROM time_field) AS col_6, + EXTRACT(DAY FROM time_field) AS col_7, + EXTRACT(DAYOFYEAR FROM time_field) AS col_8, + EXTRACT(WEEK FROM time_field) AS col_9, + EXTRACT(WEEK FROM time_field) AS col_10, + EXTRACT(ISOWEEK FROM time_field) AS col_11, + EXTRACT(MONTH FROM time_field) AS col_12, + EXTRACT(QUARTER FROM time_field) AS col_13, + EXTRACT(YEAR FROM time_field) AS col_14, + EXTRACT(ISOYEAR FROM time_field) AS col_15, + EXTRACT(DATE FROM time_field) AS col_16, + EXTRACT(TIME FROM time_field) AS col_17 + FROM project.dataset.src_table_a + ) + SELECT * FROM cte + UNION + SELECT * + FROM project.dataset.src_table_b + UNION + SELECT * FROM `project.dataset.src_table_c` + """ + ) + + assert parser.get_tables() == [ + "project.dataset.test_table", + "project.dataset.src_table_a", + "project.dataset.src_table_b", + "project.dataset.src_table_c", + ] + + +def test_sqllineage_sql_parser_create_or_replace_view_name_not_wrapped_in_backticks(): + parser = SqlLineageSQLParser( + sql_query=""" + CREATE OR REPLACE VIEW project.dataset.test_view AS + WITH cte AS ( + SELECT + *, + 'foo' AS bar + FROM project.dataset.src_table_a + ) + SELECT * FROM cte + UNION + SELECT * + FROM project.dataset.src_table_b + UNION + SELECT * FROM `project.dataset.src_table_c` + """ + ) + + assert parser.get_tables() == [ + "project.dataset.test_view", + "project.dataset.src_table_a", + "project.dataset.src_table_b", + "project.dataset.src_table_c", + ] + + +def test_sqllineage_sql_parser_create_table_name_not_wrapped_in_backticks(): + parser = SqlLineageSQLParser( + sql_query=""" + CREATE TABLE project.dataset.test_table AS + WITH cte AS ( + SELECT + *, + 'foo' AS bar + FROM project.dataset.src_table_a + ) + SELECT * FROM cte + UNION + SELECT * + FROM project.dataset.src_table_b + UNION + SELECT * FROM `project.dataset.src_table_c` + """ + ) + + assert parser.get_tables() == [ + "project.dataset.test_table", + "project.dataset.src_table_a", + "project.dataset.src_table_b", + "project.dataset.src_table_c", + ] + + +def test_sqllineage_sql_parser_create_view_name_not_wrapped_in_backticks(): + parser = SqlLineageSQLParser( + sql_query=""" + CREATE VIEW project.dataset.test_view AS + WITH cte AS ( + SELECT + *, + 'foo' AS bar + FROM project.dataset.src_table_a + ) + SELECT * FROM cte + UNION + SELECT * + FROM project.dataset.src_table_b + UNION + SELECT * FROM `project.dataset.src_table_c` + """ + ) + + assert parser.get_tables() == [ + "project.dataset.test_view", + "project.dataset.src_table_a", + "project.dataset.src_table_b", + "project.dataset.src_table_c", + ] + + +def test_sqllineage_sql_parser_from_as_column_name_is_escaped(): + parser = SqlLineageSQLParser( + sql_query=""" + CREATE TABLE project.dataset.test_table AS + SELECT x.from AS col + FROM project.dataset.src_table AS x + """ + ) + + assert parser.get_tables() == [ + "project.dataset.test_table", + "project.dataset.src_table", + ] + + +def test_sqllineage_sql_parser_cte_alias_near_with_keyword_is_escaped(): + parser = SqlLineageSQLParser( + sql_query=""" + create or replace view `project.dataset.test_view` as + with map as ( + select col_1 + col_2 + from `project.dataset.src_table_a` a + join `project.dataset.src_table_b` b + on a.col_1 = b.col_2 + ), + cte_2 as ( + select * from `project.dataset.src_table_c` + ) + select * from map + union all + select * from cte_2 + """ + ) + + assert parser.get_tables() == [ + "project.dataset.test_view", + "project.dataset.src_table_a", + "project.dataset.src_table_b", + "project.dataset.src_table_c", + ]