Skip to content

Commit

Permalink
fix(bigquery): add handling of sql keywords 'from', 'with', 'admin', …
Browse files Browse the repository at this point in the history
…'hour'
  • Loading branch information
vgaidass committed Apr 26, 2022
1 parent cc8e8fa commit 3d6c6e6
Show file tree
Hide file tree
Showing 2 changed files with 253 additions and 28 deletions.
96 changes: 70 additions & 26 deletions metadata-ingestion/src/datahub/utilities/sql_lineage_parser_impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from typing import Dict, List, Set

from sqllineage.core.holders import Column, SQLLineageHolder
from sqllineage.exceptions import SQLLineageException

try:
import sqlparse
Expand All @@ -20,8 +21,10 @@

class SqlLineageSQLParserImpl:
_DATE_SWAP_TOKEN = "__d_a_t_e"
_HOUR_SWAP_TOKEN = "__h_o_u_r"
_TIMESTAMP_SWAP_TOKEN = "__t_i_m_e_s_t_a_m_p"
_DATA_SWAP_TOKEN = "__d_a_t_a"
_ADMIN_SWAP_TOKEN = "__a_d_m_i_n"
_MYVIEW_SQL_TABLE_NAME_TOKEN = "__my_view__.__sql_table_name__"
_MYVIEW_LOOKER_TOKEN = "my_view.SQL_TABLE_NAME"

Expand All @@ -32,18 +35,59 @@ def __init__(self, sql_query: str) -> None:
if "lateral flatten" in sql_query:
sql_query = sql_query[: sql_query.find("lateral flatten")]

# BigQuery can use # as a comment symbol and that may break the parsing
# if the comment starts without a space sign after #
# and the comment is written inside SQL DDL statement (e.g., after CREATE...AS)
sql_query = re.sub(r"#([^ ])", r"# \1", sql_query, flags=re.IGNORECASE)

# Wrap calls to field '<table_alias>.from' in ` so it would not be taken as a reserved keyword
sql_query = re.sub(r"(\w*\.from)", r"`\1`", sql_query, flags=re.IGNORECASE)

# Apply sqlparser formatting to get rid of comments and reindent keywords
# which should remove some potential inconsistencies in parsing output
sql_query = sqlparse.format(
sql_query.strip(),
reindent_aligned=True,
strip_comments=True,
)

# SqlLineageParser does not allow table/view names not being wrapped in quotes or backticks
# Add ` signs before and after supposed object name that comes right after reserved word FROM
# note 1: this excludes date/time/datetime extract functions like EXTRACT DATE FROM...
# note 2: this includes adding ` signs to CTE aliases
sql_query = re.sub(
r"(?<!day\s)(?<!(date|time|hour|week|year)\s)(?<!month\s)(?<!(second|minute)\s)(?<!quarter\s)(?<!\.)(from\s)([^`\s()]+)",
r"\3`\4`",
sql_query,
flags=re.IGNORECASE,
)

# Add ` signs before and after table/view name at the beginning of SQL DDL statement (e.g. CREATE/CREATE OR REPLACE...AS)
sql_query = re.sub(
r"(create.*\s)(table\s|view\s)([^`\s()]+)(?=\sas)",
r"\1\2`\3`",
sql_query,
flags=re.IGNORECASE,
)

# Add ` signs before and after CTE alias name at WITH
sql_query = re.sub(
r"(with\s)([^`\s()]+)", r"\1`\2`", sql_query, flags=re.IGNORECASE
)

# Replace reserved words that break SqlLineageParser
self.token_to_original: Dict[str, str] = {
self._DATE_SWAP_TOKEN: "date",
self._HOUR_SWAP_TOKEN: "hour",
self._TIMESTAMP_SWAP_TOKEN: "timestamp",
self._DATA_SWAP_TOKEN: "data",
self._ADMIN_SWAP_TOKEN: "admin",
}
for replacement, original in self.token_to_original.items():
sql_query = re.sub(
rf"(\b{original}\b)", rf"{replacement}", sql_query, flags=re.IGNORECASE
)

sql_query = re.sub(r"#([^ ])", r"# \1", sql_query)

# SqlLineageParser lowercarese tablenames and we need to replace Looker specific token which should be uppercased
sql_query = re.sub(
rf"(\${{{self._MYVIEW_LOOKER_TOKEN}}})",
Expand All @@ -57,36 +101,36 @@ def __init__(self, sql_query: str) -> None:
# Replace lookml templates with the variable otherwise sqlparse can't parse ${
sql_query = re.sub(r"(\${)(.+)(})", r"\2", sql_query)
if sql_query != original_sql_query:
logger.debug(f"rewrote original query {original_sql_query} as {sql_query}")
logger.debug(f"Rewrote original query {original_sql_query} as {sql_query}")

self._sql = sql_query

self._stmt = [
s
for s in sqlparse.parse(
# first apply sqlparser formatting just to get rid of comments, which cause
# inconsistencies in parsing output
sqlparse.format(
self._sql.strip(),
strip_comments=True,
use_space_around_operators=True,
),
)
if s.token_first(skip_cm=True)
]
try:
self._stmt = [
s
for s in sqlparse.parse(
sqlparse.format(
self._sql.strip(),
use_space_around_operators=True,
),
)
if s.token_first(skip_cm=True)
]

with unittest.mock.patch(
"sqllineage.core.handlers.source.SourceHandler.end_of_query_cleanup",
datahub.utilities.sqllineage_patch.end_of_query_cleanup_patch,
):
with unittest.mock.patch(
"sqllineage.core.holders.SubQueryLineageHolder.add_column_lineage",
datahub.utilities.sqllineage_patch.add_column_lineage_patch,
"sqllineage.core.handlers.source.SourceHandler.end_of_query_cleanup",
datahub.utilities.sqllineage_patch.end_of_query_cleanup_patch,
):
self._stmt_holders = [
LineageAnalyzer().analyze(stmt) for stmt in self._stmt
]
self._sql_holder = SQLLineageHolder.of(*self._stmt_holders)
with unittest.mock.patch(
"sqllineage.core.holders.SubQueryLineageHolder.add_column_lineage",
datahub.utilities.sqllineage_patch.add_column_lineage_patch,
):
self._stmt_holders = [
LineageAnalyzer().analyze(stmt) for stmt in self._stmt
]
self._sql_holder = SQLLineageHolder.of(*self._stmt_holders)
except SQLLineageException as e:
logger.error(f"SQL lineage analyzer error '{e}' for query: '{self._sql}")

def get_tables(self) -> List[str]:
result: List[str] = list()
Expand Down
185 changes: 183 additions & 2 deletions metadata-ingestion/tests/unit/test_utilities.py
Original file line number Diff line number Diff line change
Expand Up @@ -391,10 +391,191 @@ def test_with_keyword_data():
SELECT
*,
'foo' AS bar
FROM `project.example_dataset.another_example_table_vw`
FROM `project.example_dataset.example_table`
)
SELECT * FROM data
"""
)

assert parser.get_tables() == ["project.example_dataset.another_example_table_vw"]
assert parser.get_tables() == ["project.example_dataset.example_table"]


def test_with_keyword_admin():
parser = SqlLineageSQLParser(
sql_query="""
WITH admin AS (
SELECT *
FROM `project.example_dataset.example_table`
)
SELECT * FROM admin
"""
)

assert parser.get_tables() == ["project.example_dataset.example_table"]


def test_sqllineage_sql_parser_create_or_replace_table_name_not_wrapped_in_backticks():
parser = SqlLineageSQLParser(
sql_query="""
CREATE OR REPLACE TABLE project.dataset.test_table AS
WITH cte AS (
SELECT
*,
EXTRACT(MICROSECOND FROM time_field) AS col_1,
EXTRACT(MILLISECOND FROM time_field) AS col_2,
EXTRACT(SECOND FROM time_field) AS col_3,
EXTRACT(MINUTE FROM time_field) AS col_4,
EXTRACT(HOUR FROM time_field) AS col_5,
EXTRACT(DAYOFWEEK FROM time_field) AS col_6,
EXTRACT(DAY FROM time_field) AS col_7,
EXTRACT(DAYOFYEAR FROM time_field) AS col_8,
EXTRACT(WEEK FROM time_field) AS col_9,
EXTRACT(WEEK FROM time_field) AS col_10,
EXTRACT(ISOWEEK FROM time_field) AS col_11,
EXTRACT(MONTH FROM time_field) AS col_12,
EXTRACT(QUARTER FROM time_field) AS col_13,
EXTRACT(YEAR FROM time_field) AS col_14,
EXTRACT(ISOYEAR FROM time_field) AS col_15,
EXTRACT(DATE FROM time_field) AS col_16,
EXTRACT(TIME FROM time_field) AS col_17
FROM project.dataset.src_table_a
)
SELECT * FROM cte
UNION
SELECT *
FROM project.dataset.src_table_b
UNION
SELECT * FROM `project.dataset.src_table_c`
"""
)

assert parser.get_tables() == [
"project.dataset.test_table",
"project.dataset.src_table_a",
"project.dataset.src_table_b",
"project.dataset.src_table_c",
]


def test_sqllineage_sql_parser_create_or_replace_view_name_not_wrapped_in_backticks():
parser = SqlLineageSQLParser(
sql_query="""
CREATE OR REPLACE VIEW project.dataset.test_view AS
WITH cte AS (
SELECT
*,
'foo' AS bar
FROM project.dataset.src_table_a
)
SELECT * FROM cte
UNION
SELECT *
FROM project.dataset.src_table_b
UNION
SELECT * FROM `project.dataset.src_table_c`
"""
)

assert parser.get_tables() == [
"project.dataset.test_view",
"project.dataset.src_table_a",
"project.dataset.src_table_b",
"project.dataset.src_table_c",
]


def test_sqllineage_sql_parser_create_table_name_not_wrapped_in_backticks():
parser = SqlLineageSQLParser(
sql_query="""
CREATE TABLE project.dataset.test_table AS
WITH cte AS (
SELECT
*,
'foo' AS bar
FROM project.dataset.src_table_a
)
SELECT * FROM cte
UNION
SELECT *
FROM project.dataset.src_table_b
UNION
SELECT * FROM `project.dataset.src_table_c`
"""
)

assert parser.get_tables() == [
"project.dataset.test_table",
"project.dataset.src_table_a",
"project.dataset.src_table_b",
"project.dataset.src_table_c",
]


def test_sqllineage_sql_parser_create_view_name_not_wrapped_in_backticks():
parser = SqlLineageSQLParser(
sql_query="""
CREATE VIEW project.dataset.test_view AS
WITH cte AS (
SELECT
*,
'foo' AS bar
FROM project.dataset.src_table_a
)
SELECT * FROM cte
UNION
SELECT *
FROM project.dataset.src_table_b
UNION
SELECT * FROM `project.dataset.src_table_c`
"""
)

assert parser.get_tables() == [
"project.dataset.test_view",
"project.dataset.src_table_a",
"project.dataset.src_table_b",
"project.dataset.src_table_c",
]


def test_sqllineage_sql_parser_from_as_column_name_is_escaped():
parser = SqlLineageSQLParser(
sql_query="""
CREATE TABLE project.dataset.test_table AS
SELECT x.from AS col
FROM project.dataset.src_table AS x
"""
)

assert parser.get_tables() == [
"project.dataset.test_table",
"project.dataset.src_table",
]


def test_sqllineage_sql_parser_cte_alias_near_with_keyword_is_escaped():
parser = SqlLineageSQLParser(
sql_query="""
create or replace view `project.dataset.test_view` as
with map as (
select col_1
col_2
from `project.dataset.src_table_a` a
join `project.dataset.src_table_b` b
on a.col_1 = b.col_2
),
cte_2 as (
select * from `project.dataset.src_table_c`
)
select * from map
union all
select * from cte_2
"""
)

assert parser.get_tables() == [
"project.dataset.test_view",
"project.dataset.src_table_a",
"project.dataset.src_table_b",
"project.dataset.src_table_c",
]

0 comments on commit 3d6c6e6

Please sign in to comment.