-
Notifications
You must be signed in to change notification settings - Fork 3k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
fix(bigquery): improve handling of extracted audit log sql queries (#…
- Loading branch information
Showing
6 changed files
with
483 additions
and
63 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
80 changes: 80 additions & 0 deletions
80
metadata-ingestion/src/datahub/utilities/bigquery_sql_parser.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,80 @@ | ||
import re | ||
from typing import List | ||
|
||
import sqlparse | ||
|
||
from datahub.utilities.sql_parser import SqlLineageSQLParser, SQLParser | ||
|
||
|
||
class BigQuerySQLParser(SQLParser): | ||
parser: SQLParser | ||
|
||
def __init__(self, sql_query: str) -> None: | ||
super().__init__(sql_query) | ||
|
||
self._parsed_sql_query = self.parse_sql_query(sql_query) | ||
self.parser = SqlLineageSQLParser(self._parsed_sql_query) | ||
|
||
def parse_sql_query(self, sql_query: str) -> str: | ||
sql_query = BigQuerySQLParser._parse_bigquery_comment_sign(sql_query) | ||
sql_query = BigQuerySQLParser._escape_keyword_from_as_field_name(sql_query) | ||
sql_query = BigQuerySQLParser._escape_cte_name_after_keyword_with(sql_query) | ||
|
||
sql_query = sqlparse.format( | ||
sql_query.strip(), | ||
reindent_aligned=True, | ||
strip_comments=True, | ||
) | ||
|
||
sql_query = BigQuerySQLParser._escape_table_or_view_name_at_create_statement( | ||
sql_query | ||
) | ||
sql_query = BigQuerySQLParser._escape_object_name_after_keyword_from(sql_query) | ||
|
||
return sql_query | ||
|
||
@staticmethod | ||
def _parse_bigquery_comment_sign(sql_query: str) -> str: | ||
return re.sub(r"#(.*)", r"-- \1", sql_query, flags=re.IGNORECASE) | ||
|
||
@staticmethod | ||
def _escape_keyword_from_as_field_name(sql_query: str) -> str: | ||
return re.sub(r"(\w*\.from)", r"`\1`", sql_query, flags=re.IGNORECASE) | ||
|
||
@staticmethod | ||
def _escape_cte_name_after_keyword_with(sql_query: str) -> str: | ||
""" | ||
Escape the first cte name in case it is one of reserved words | ||
""" | ||
return re.sub(r"(with\s)([^`\s()]+)", r"\1`\2`", sql_query, flags=re.IGNORECASE) | ||
|
||
@staticmethod | ||
def _escape_table_or_view_name_at_create_statement(sql_query: str) -> str: | ||
""" | ||
Reason: in case table name contains hyphens which breaks sqllineage later on | ||
""" | ||
return re.sub( | ||
r"(create.*\s)(table\s|view\s)([^`\s()]+)(?=\sas)", | ||
r"\1\2`\3`", | ||
sql_query, | ||
flags=re.IGNORECASE, | ||
) | ||
|
||
@staticmethod | ||
def _escape_object_name_after_keyword_from(sql_query: str) -> str: | ||
""" | ||
Reason: in case table name contains hyphens which breaks sqllineage later on | ||
Note: ignore cases of having keyword FROM as part of datetime function EXTRACT | ||
""" | ||
return re.sub( | ||
r"(?<!day\s)(?<!(date|time|hour|week|year)\s)(?<!month\s)(?<!(second|minute)\s)(?<!quarter\s)(?<!\.)(from\s)([^`\s()]+)", | ||
r"\3`\4`", | ||
sql_query, | ||
flags=re.IGNORECASE, | ||
) | ||
|
||
def get_tables(self) -> List[str]: | ||
return self.parser.get_tables() | ||
|
||
def get_columns(self) -> List[str]: | ||
return self.parser.get_columns() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
119 changes: 119 additions & 0 deletions
119
metadata-ingestion/tests/unit/test_bigquery_sql_lineage.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,119 @@ | ||
from datahub.utilities.bigquery_sql_parser import BigQuerySQLParser | ||
|
||
|
||
def test_bigquery_sql_lineage_hash_as_comment_sign_is_accepted(): | ||
parser = BigQuerySQLParser( | ||
sql_query=""" | ||
/* | ||
HERE IS A STANDARD COMMENT BLOCK | ||
THIS WILL NOT BREAK sqllineage | ||
*/ | ||
CREATE OR REPLACE TABLE `project.dataset.trg_tbl`AS | ||
#This, comment will not break sqllineage | ||
SELECT foo | ||
-- this comment will not break sqllineage either | ||
# this comment will not break sqllineage either | ||
FROM `project.dataset.src_tbl` | ||
""" | ||
) | ||
|
||
assert parser.get_tables() == ["project.dataset.src_tbl"] | ||
|
||
|
||
def test_bigquery_sql_lineage_keyword_data_is_accepted(): | ||
parser = BigQuerySQLParser( | ||
sql_query=""" | ||
WITH data AS ( | ||
SELECT | ||
*, | ||
'foo' AS bar | ||
FROM `project.example_dataset.example_table` | ||
) | ||
SELECT * FROM data | ||
""" | ||
) | ||
|
||
assert parser.get_tables() == ["project.example_dataset.example_table"] | ||
|
||
|
||
def test_bigquery_sql_lineage_keyword_admin_is_accepted(): | ||
parser = BigQuerySQLParser( | ||
sql_query=""" | ||
WITH admin AS ( | ||
SELECT * | ||
FROM `project.example_dataset.example_table` | ||
) | ||
SELECT * FROM admin | ||
""" | ||
) | ||
|
||
assert parser.get_tables() == ["project.example_dataset.example_table"] | ||
|
||
|
||
def test_bigquery_sql_lineage_cte_alias_as_keyword_is_accepted(): | ||
parser = BigQuerySQLParser( | ||
sql_query=""" | ||
CREATE OR REPLACE TABLE `project.dataset.test_table` AS | ||
WITH map AS ( | ||
SELECT a.col_1, | ||
b.col_2 | ||
FROM ( | ||
SELECT DISTINCT * | ||
FROM ( | ||
SELECT col_1 | ||
FROM `project.dataset.source_table_a` | ||
) | ||
) a | ||
JOIN `project.dataset.source_table_b` b | ||
ON a.col_1 = b.col_1 | ||
) | ||
SELECT * | ||
FROM map | ||
""" | ||
) | ||
|
||
assert parser.get_tables() == [ | ||
"project.dataset.source_table_a", | ||
"project.dataset.source_table_b", | ||
] | ||
|
||
|
||
def test_bigquery_sql_lineage_create_or_replace_view_name_with_hyphens_is_accepted(): | ||
parser = BigQuerySQLParser( | ||
sql_query=""" | ||
CREATE OR REPLACE VIEW test-project.dataset.test_view AS | ||
SELECT * | ||
FROM project.dataset.src_table_a | ||
UNION | ||
SELECT * FROM `project.dataset.src_table_b` | ||
""" | ||
) | ||
|
||
assert parser.get_tables() == [ | ||
"project.dataset.src_table_a", | ||
"project.dataset.src_table_b", | ||
] | ||
|
||
|
||
def test_bigquery_sql_lineage_source_table_name_with_hyphens_is_accepted(): | ||
parser = BigQuerySQLParser( | ||
sql_query=""" | ||
CREATE OR REPLACE VIEW `project.dataset.test_view` AS | ||
SELECT * | ||
FROM test-project.dataset.src_table | ||
""" | ||
) | ||
|
||
assert parser.get_tables() == ["test-project.dataset.src_table"] | ||
|
||
|
||
def test_bigquery_sql_lineage_from_as_column_name_is_accepted(): | ||
parser = BigQuerySQLParser( | ||
sql_query=""" | ||
CREATE OR REPLACE VIEW `project.dataset.test_view` AS | ||
SELECT x.from AS col | ||
FROM project.dataset.src_table AS x | ||
""" | ||
) | ||
|
||
assert parser.get_tables() == ["project.dataset.src_table"] |
Oops, something went wrong.