Skip to content

Commit

Permalink
fix(bigquery): improve handling of extracted audit log sql queries (d…
Browse files Browse the repository at this point in the history
  • Loading branch information
vgaidass authored and maggiehays committed Aug 1, 2022
1 parent 1754d65 commit 03fd465
Show file tree
Hide file tree
Showing 6 changed files with 483 additions and 63 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@
UpstreamClass,
UpstreamLineageClass,
)
from datahub.utilities.sql_parser import DefaultSQLParser
from datahub.utilities.bigquery_sql_parser import BigQuerySQLParser

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -181,6 +181,9 @@ def bigquery_audit_metadata_query_template(
"""
Receives a dataset (with project specified) and returns a query template that is used to query exported
AuditLogs containing protoPayloads of type BigQueryAuditMetadata.
Include only those that:
- have been completed (jobStatus.jobState = "DONE")
- do not contain errors (jobStatus.errorResults is none)
:param dataset: the dataset to query against in the form of $PROJECT.$DATASET
:param use_date_sharded_tables: whether to read from date sharded audit log tables or time partitioned audit log
tables
Expand Down Expand Up @@ -221,6 +224,7 @@ def bigquery_audit_metadata_query_template(
AND timestamp < "{end_time}"
AND protopayload_auditlog.serviceName="bigquery.googleapis.com"
AND JSON_EXTRACT_SCALAR(protopayload_auditlog.metadataJson, "$.jobChange.job.jobStatus.jobState") = "DONE"
AND JSON_EXTRACT(protopayload_auditlog.metadataJson, "$.jobChange.job.jobStatus.errorResults") IS NULL
AND JSON_EXTRACT(protopayload_auditlog.metadataJson, "$.jobChange.job.jobConfig.queryConfig") IS NOT NULL;
"""

Expand Down Expand Up @@ -614,7 +618,7 @@ def _create_lineage_map(self, entries: Iterable[QueryEvent]) -> Dict[str, Set[st
# If there is a view being referenced then bigquery sends both the view as well as underlying table
# in the references. There is no distinction between direct/base objects accessed. So doing sql parsing
# to ensure we only use direct objects accessed for lineage
parser = DefaultSQLParser(e.query)
parser = BigQuerySQLParser(e.query)
referenced_objs = set(
map(lambda x: x.split(".")[-1], parser.get_tables())
)
Expand Down
80 changes: 80 additions & 0 deletions metadata-ingestion/src/datahub/utilities/bigquery_sql_parser.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
import re
from typing import List

import sqlparse

from datahub.utilities.sql_parser import SqlLineageSQLParser, SQLParser


class BigQuerySQLParser(SQLParser):
parser: SQLParser

def __init__(self, sql_query: str) -> None:
super().__init__(sql_query)

self._parsed_sql_query = self.parse_sql_query(sql_query)
self.parser = SqlLineageSQLParser(self._parsed_sql_query)

def parse_sql_query(self, sql_query: str) -> str:
sql_query = BigQuerySQLParser._parse_bigquery_comment_sign(sql_query)
sql_query = BigQuerySQLParser._escape_keyword_from_as_field_name(sql_query)
sql_query = BigQuerySQLParser._escape_cte_name_after_keyword_with(sql_query)

sql_query = sqlparse.format(
sql_query.strip(),
reindent_aligned=True,
strip_comments=True,
)

sql_query = BigQuerySQLParser._escape_table_or_view_name_at_create_statement(
sql_query
)
sql_query = BigQuerySQLParser._escape_object_name_after_keyword_from(sql_query)

return sql_query

@staticmethod
def _parse_bigquery_comment_sign(sql_query: str) -> str:
return re.sub(r"#(.*)", r"-- \1", sql_query, flags=re.IGNORECASE)

@staticmethod
def _escape_keyword_from_as_field_name(sql_query: str) -> str:
return re.sub(r"(\w*\.from)", r"`\1`", sql_query, flags=re.IGNORECASE)

@staticmethod
def _escape_cte_name_after_keyword_with(sql_query: str) -> str:
"""
Escape the first cte name in case it is one of reserved words
"""
return re.sub(r"(with\s)([^`\s()]+)", r"\1`\2`", sql_query, flags=re.IGNORECASE)

@staticmethod
def _escape_table_or_view_name_at_create_statement(sql_query: str) -> str:
"""
Reason: in case table name contains hyphens which breaks sqllineage later on
"""
return re.sub(
r"(create.*\s)(table\s|view\s)([^`\s()]+)(?=\sas)",
r"\1\2`\3`",
sql_query,
flags=re.IGNORECASE,
)

@staticmethod
def _escape_object_name_after_keyword_from(sql_query: str) -> str:
"""
Reason: in case table name contains hyphens which breaks sqllineage later on
Note: ignore cases of having keyword FROM as part of datetime function EXTRACT
"""
return re.sub(
r"(?<!day\s)(?<!(date|time|hour|week|year)\s)(?<!month\s)(?<!(second|minute)\s)(?<!quarter\s)(?<!\.)(from\s)([^`\s()]+)",
r"\3`\4`",
sql_query,
flags=re.IGNORECASE,
)

def get_tables(self) -> List[str]:
return self.parser.get_tables()

def get_columns(self) -> List[str]:
return self.parser.get_columns()
59 changes: 33 additions & 26 deletions metadata-ingestion/src/datahub/utilities/sql_lineage_parser_impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from typing import Dict, List, Set

from sqllineage.core.holders import Column, SQLLineageHolder
from sqllineage.exceptions import SQLLineageException

try:
import sqlparse
Expand All @@ -20,8 +21,10 @@

class SqlLineageSQLParserImpl:
_DATE_SWAP_TOKEN = "__d_a_t_e"
_HOUR_SWAP_TOKEN = "__h_o_u_r"
_TIMESTAMP_SWAP_TOKEN = "__t_i_m_e_s_t_a_m_p"
_DATA_SWAP_TOKEN = "__d_a_t_a"
_ADMIN_SWAP_TOKEN = "__a_d_m_i_n"
_MYVIEW_SQL_TABLE_NAME_TOKEN = "__my_view__.__sql_table_name__"
_MYVIEW_LOOKER_TOKEN = "my_view.SQL_TABLE_NAME"

Expand All @@ -32,18 +35,19 @@ def __init__(self, sql_query: str) -> None:
if "lateral flatten" in sql_query:
sql_query = sql_query[: sql_query.find("lateral flatten")]

# Replace reserved words that break SqlLineageParser
self.token_to_original: Dict[str, str] = {
self._DATE_SWAP_TOKEN: "date",
self._HOUR_SWAP_TOKEN: "hour",
self._TIMESTAMP_SWAP_TOKEN: "timestamp",
self._DATA_SWAP_TOKEN: "data",
self._ADMIN_SWAP_TOKEN: "admin",
}
for replacement, original in self.token_to_original.items():
sql_query = re.sub(
rf"(\b{original}\b)", rf"{replacement}", sql_query, flags=re.IGNORECASE
)

sql_query = re.sub(r"#([^ ])", r"# \1", sql_query)

# SqlLineageParser lowercarese tablenames and we need to replace Looker specific token which should be uppercased
sql_query = re.sub(
rf"(\${{{self._MYVIEW_LOOKER_TOKEN}}})",
Expand All @@ -57,36 +61,39 @@ def __init__(self, sql_query: str) -> None:
# Replace lookml templates with the variable otherwise sqlparse can't parse ${
sql_query = re.sub(r"(\${)(.+)(})", r"\2", sql_query)
if sql_query != original_sql_query:
logger.debug(f"rewrote original query {original_sql_query} as {sql_query}")
logger.debug(f"Rewrote original query {original_sql_query} as {sql_query}")

self._sql = sql_query

self._stmt = [
s
for s in sqlparse.parse(
# first apply sqlparser formatting just to get rid of comments, which cause
# inconsistencies in parsing output
sqlparse.format(
self._sql.strip(),
strip_comments=True,
use_space_around_operators=True,
),
)
if s.token_first(skip_cm=True)
]
try:
self._stmt = [
s
for s in sqlparse.parse(
# first apply sqlparser formatting just to get rid of comments, which cause
# inconsistencies in parsing output
sqlparse.format(
self._sql.strip(),
strip_comments=True,
use_space_around_operators=True,
),
)
if s.token_first(skip_cm=True)
]

with unittest.mock.patch(
"sqllineage.core.handlers.source.SourceHandler.end_of_query_cleanup",
datahub.utilities.sqllineage_patch.end_of_query_cleanup_patch,
):
with unittest.mock.patch(
"sqllineage.core.holders.SubQueryLineageHolder.add_column_lineage",
datahub.utilities.sqllineage_patch.add_column_lineage_patch,
"sqllineage.core.handlers.source.SourceHandler.end_of_query_cleanup",
datahub.utilities.sqllineage_patch.end_of_query_cleanup_patch,
):
self._stmt_holders = [
LineageAnalyzer().analyze(stmt) for stmt in self._stmt
]
self._sql_holder = SQLLineageHolder.of(*self._stmt_holders)
with unittest.mock.patch(
"sqllineage.core.holders.SubQueryLineageHolder.add_column_lineage",
datahub.utilities.sqllineage_patch.add_column_lineage_patch,
):
self._stmt_holders = [
LineageAnalyzer().analyze(stmt) for stmt in self._stmt
]
self._sql_holder = SQLLineageHolder.of(*self._stmt_holders)
except SQLLineageException as e:
logger.error(f"SQL lineage analyzer error '{e}' for query: '{self._sql}")

def get_tables(self) -> List[str]:
result: List[str] = list()
Expand Down
119 changes: 119 additions & 0 deletions metadata-ingestion/tests/unit/test_bigquery_sql_lineage.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
from datahub.utilities.bigquery_sql_parser import BigQuerySQLParser


def test_bigquery_sql_lineage_hash_as_comment_sign_is_accepted():
parser = BigQuerySQLParser(
sql_query="""
/*
HERE IS A STANDARD COMMENT BLOCK
THIS WILL NOT BREAK sqllineage
*/
CREATE OR REPLACE TABLE `project.dataset.trg_tbl`AS
#This, comment will not break sqllineage
SELECT foo
-- this comment will not break sqllineage either
# this comment will not break sqllineage either
FROM `project.dataset.src_tbl`
"""
)

assert parser.get_tables() == ["project.dataset.src_tbl"]


def test_bigquery_sql_lineage_keyword_data_is_accepted():
parser = BigQuerySQLParser(
sql_query="""
WITH data AS (
SELECT
*,
'foo' AS bar
FROM `project.example_dataset.example_table`
)
SELECT * FROM data
"""
)

assert parser.get_tables() == ["project.example_dataset.example_table"]


def test_bigquery_sql_lineage_keyword_admin_is_accepted():
parser = BigQuerySQLParser(
sql_query="""
WITH admin AS (
SELECT *
FROM `project.example_dataset.example_table`
)
SELECT * FROM admin
"""
)

assert parser.get_tables() == ["project.example_dataset.example_table"]


def test_bigquery_sql_lineage_cte_alias_as_keyword_is_accepted():
parser = BigQuerySQLParser(
sql_query="""
CREATE OR REPLACE TABLE `project.dataset.test_table` AS
WITH map AS (
SELECT a.col_1,
b.col_2
FROM (
SELECT DISTINCT *
FROM (
SELECT col_1
FROM `project.dataset.source_table_a`
)
) a
JOIN `project.dataset.source_table_b` b
ON a.col_1 = b.col_1
)
SELECT *
FROM map
"""
)

assert parser.get_tables() == [
"project.dataset.source_table_a",
"project.dataset.source_table_b",
]


def test_bigquery_sql_lineage_create_or_replace_view_name_with_hyphens_is_accepted():
parser = BigQuerySQLParser(
sql_query="""
CREATE OR REPLACE VIEW test-project.dataset.test_view AS
SELECT *
FROM project.dataset.src_table_a
UNION
SELECT * FROM `project.dataset.src_table_b`
"""
)

assert parser.get_tables() == [
"project.dataset.src_table_a",
"project.dataset.src_table_b",
]


def test_bigquery_sql_lineage_source_table_name_with_hyphens_is_accepted():
parser = BigQuerySQLParser(
sql_query="""
CREATE OR REPLACE VIEW `project.dataset.test_view` AS
SELECT *
FROM test-project.dataset.src_table
"""
)

assert parser.get_tables() == ["test-project.dataset.src_table"]


def test_bigquery_sql_lineage_from_as_column_name_is_accepted():
parser = BigQuerySQLParser(
sql_query="""
CREATE OR REPLACE VIEW `project.dataset.test_view` AS
SELECT x.from AS col
FROM project.dataset.src_table AS x
"""
)

assert parser.get_tables() == ["project.dataset.src_table"]
Loading

0 comments on commit 03fd465

Please sign in to comment.