Skip to content

Commit

Permalink
fix(bigquery): isolate bigquery-relevant sql parser to its own class
Browse files Browse the repository at this point in the history
  • Loading branch information
vgaidass committed Apr 29, 2022
1 parent 4e87394 commit 89816e3
Show file tree
Hide file tree
Showing 6 changed files with 449 additions and 255 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@
UpstreamClass,
UpstreamLineageClass,
)
from datahub.utilities.sql_parser import DefaultSQLParser
from datahub.utilities.bigquery_sql_parser import BigQuerySQLParser

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -550,7 +550,7 @@ def _create_lineage_map(self, entries: Iterable[QueryEvent]) -> Dict[str, Set[st
# If there is a view being referenced then bigquery sends both the view as well as underlying table
# in the references. There is no distinction between direct/base objects accessed. So doing sql parsing
# to ensure we only use direct objects accessed for lineage
parser = DefaultSQLParser(e.query)
parser = BigQuerySQLParser(e.query)
referenced_objs = set(
map(lambda x: x.split(".")[-1], parser.get_tables())
)
Expand Down
80 changes: 80 additions & 0 deletions metadata-ingestion/src/datahub/utilities/bigquery_sql_parser.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
import re
from typing import List

import sqlparse

from datahub.utilities.sql_parser import SqlLineageSQLParser, SQLParser


class BigQuerySQLParser(SQLParser):
parser: SQLParser

def __init__(self, sql_query: str) -> None:
super().__init__(sql_query)

self._parsed_sql_query = self.parse_sql_query(sql_query)
self.parser = SqlLineageSQLParser(self._parsed_sql_query)

def parse_sql_query(self, sql_query: str) -> str:
sql_query = BigQuerySQLParser._parse_bigquery_comment_sign(sql_query)
sql_query = BigQuerySQLParser._escape_keyword_from_as_field_name(sql_query)
sql_query = BigQuerySQLParser._escape_cte_name_after_keyword_with(sql_query)

sql_query = sqlparse.format(
sql_query.strip(),
reindent_aligned=True,
strip_comments=True,
)

sql_query = BigQuerySQLParser._escape_table_or_view_name_at_create_statement(
sql_query
)
sql_query = BigQuerySQLParser._escape_object_name_after_keyword_from(sql_query)

return sql_query

@staticmethod
def _parse_bigquery_comment_sign(sql_query: str) -> str:
return re.sub(r"#(.*)", r"-- \1", sql_query, flags=re.IGNORECASE)

@staticmethod
def _escape_keyword_from_as_field_name(sql_query: str) -> str:
return re.sub(r"(\w*\.from)", r"`\1`", sql_query, flags=re.IGNORECASE)

@staticmethod
def _escape_cte_name_after_keyword_with(sql_query: str) -> str:
"""
Escape the first cte name in case it is one of reserved words
"""
return re.sub(r"(with\s)([^`\s()]+)", r"\1`\2`", sql_query, flags=re.IGNORECASE)

@staticmethod
def _escape_table_or_view_name_at_create_statement(sql_query: str) -> str:
"""
Reason: in case table name contains hyphens which breaks sqllineage later on
"""
return re.sub(
r"(create.*\s)(table\s|view\s)([^`\s()]+)(?=\sas)",
r"\1\2`\3`",
sql_query,
flags=re.IGNORECASE,
)

@staticmethod
def _escape_object_name_after_keyword_from(sql_query: str) -> str:
"""
Reason: in case table name contains hyphens which breaks sqllineage later on
Note: ignore cases of having keyword FROM as part of datetime function EXTRACT
"""
return re.sub(
r"(?<!day\s)(?<!(date|time|hour|week|year)\s)(?<!month\s)(?<!(second|minute)\s)(?<!quarter\s)(?<!\.)(from\s)([^`\s()]+)",
r"\3`\4`",
sql_query,
flags=re.IGNORECASE,
)

def get_tables(self) -> List[str]:
return self.parser.get_tables()

def get_columns(self) -> List[str]:
return self.parser.get_columns()
Original file line number Diff line number Diff line change
Expand Up @@ -35,46 +35,6 @@ def __init__(self, sql_query: str) -> None:
if "lateral flatten" in sql_query:
sql_query = sql_query[: sql_query.find("lateral flatten")]

# BigQuery can use # as a comment symbol and that may break the parsing
# if the comment starts without a space sign after #
# and the comment is written inside SQL DDL statement (e.g., after CREATE...AS)
sql_query = re.sub(r"#([^ ])", r"# \1", sql_query, flags=re.IGNORECASE)

# Wrap calls to field '<table_alias>.from' in ` so it would not be taken as a reserved keyword
sql_query = re.sub(r"(\w*\.from)", r"`\1`", sql_query, flags=re.IGNORECASE)

# Apply sqlparser formatting to get rid of comments and reindent keywords
# which should remove some potential inconsistencies in parsing output
sql_query = sqlparse.format(
sql_query.strip(),
reindent_aligned=True,
strip_comments=True,
)

# SqlLineageParser does not allow table/view names not being wrapped in quotes or backticks
# Add ` signs before and after supposed object name that comes right after reserved word FROM
# note 1: this excludes date/time/datetime extract functions like EXTRACT DATE FROM...
# note 2: this includes adding ` signs to CTE aliases
sql_query = re.sub(
r"(?<!day\s)(?<!(date|time|hour|week|year)\s)(?<!month\s)(?<!(second|minute)\s)(?<!quarter\s)(?<!\.)(from\s)([^`\s()]+)",
r"\3`\4`",
sql_query,
flags=re.IGNORECASE,
)

# Add ` signs before and after table/view name at the beginning of SQL DDL statement (e.g. CREATE/CREATE OR REPLACE...AS)
sql_query = re.sub(
r"(create.*\s)(table\s|view\s)([^`\s()]+)(?=\sas)",
r"\1\2`\3`",
sql_query,
flags=re.IGNORECASE,
)

# Add ` signs before and after CTE alias name at WITH
sql_query = re.sub(
r"(with\s)([^`\s()]+)", r"\1`\2`", sql_query, flags=re.IGNORECASE
)

# Replace reserved words that break SqlLineageParser
self.token_to_original: Dict[str, str] = {
self._DATE_SWAP_TOKEN: "date",
Expand Down Expand Up @@ -109,8 +69,11 @@ def __init__(self, sql_query: str) -> None:
self._stmt = [
s
for s in sqlparse.parse(
# first apply sqlparser formatting just to get rid of comments, which cause
# inconsistencies in parsing output
sqlparse.format(
self._sql.strip(),
strip_comments=True,
use_space_around_operators=True,
),
)
Expand Down
119 changes: 119 additions & 0 deletions metadata-ingestion/tests/unit/test_bigquery_sql_lineage.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
from datahub.utilities.bigquery_sql_parser import BigQuerySQLParser


def test_bigquery_sql_lineage_hash_as_comment_sign_is_accepted():
parser = BigQuerySQLParser(
sql_query="""
/*
HERE IS A STANDARD COMMENT BLOCK
THIS WILL NOT BREAK sqllineage
*/
CREATE OR REPLACE TABLE `project.dataset.trg_tbl`AS
#This, comment will not break sqllineage
SELECT foo
-- this comment will not break sqllineage either
# this comment will not break sqllineage either
FROM `project.dataset.src_tbl`
"""
)

assert parser.get_tables() == ["project.dataset.src_tbl"]


def test_bigquery_sql_lineage_keyword_data_is_accepted():
parser = BigQuerySQLParser(
sql_query="""
WITH data AS (
SELECT
*,
'foo' AS bar
FROM `project.example_dataset.example_table`
)
SELECT * FROM data
"""
)

assert parser.get_tables() == ["project.example_dataset.example_table"]


def test_bigquery_sql_lineage_keyword_admin_is_accepted():
parser = BigQuerySQLParser(
sql_query="""
WITH admin AS (
SELECT *
FROM `project.example_dataset.example_table`
)
SELECT * FROM admin
"""
)

assert parser.get_tables() == ["project.example_dataset.example_table"]


def test_bigquery_sql_lineage_cte_alias_as_keyword_is_accepted():
parser = BigQuerySQLParser(
sql_query="""
CREATE OR REPLACE TABLE `project.dataset.test_table` AS
WITH map AS (
SELECT a.col_1,
b.col_2
FROM (
SELECT DISTINCT *
FROM (
SELECT col_1
FROM `project.dataset.source_table_a`
)
) a
JOIN `project.dataset.source_table_b` b
ON a.col_1 = b.col_1
)
SELECT *
FROM map
"""
)

assert parser.get_tables() == [
"project.dataset.source_table_a",
"project.dataset.source_table_b",
]


def test_bigquery_sql_lineage_create_or_replace_view_name_with_hyphens_is_accepted():
parser = BigQuerySQLParser(
sql_query="""
CREATE OR REPLACE VIEW test-project.dataset.test_view AS
SELECT *
FROM project.dataset.src_table_a
UNION
SELECT * FROM `project.dataset.src_table_b`
"""
)

assert parser.get_tables() == [
"project.dataset.src_table_a",
"project.dataset.src_table_b",
]


def test_bigquery_sql_lineage_source_table_name_with_hyphens_is_accepted():
parser = BigQuerySQLParser(
sql_query="""
CREATE OR REPLACE VIEW `project.dataset.test_view` AS
SELECT *
FROM test-project.dataset.src_table
"""
)

assert parser.get_tables() == ["test-project.dataset.src_table"]


def test_bigquery_sql_lineage_from_as_column_name_is_accepted():
parser = BigQuerySQLParser(
sql_query="""
CREATE OR REPLACE VIEW `project.dataset.test_view` AS
SELECT x.from AS col
FROM project.dataset.src_table AS x
"""
)

assert parser.get_tables() == ["project.dataset.src_table"]
Loading

0 comments on commit 89816e3

Please sign in to comment.