Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(bigquery): improve handling of extracted audit log sql queries #4735

Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,9 @@ def bigquery_audit_metadata_query_template(
"""
Receives a dataset (with project specified) and returns a query template that is used to query exported
AuditLogs containing protoPayloads of type BigQueryAuditMetadata.
Include only those that:
- have been completed (jobStatus.jobState = "DONE")
- do not contain errors (jobStatus.errorResults is none)
:param dataset: the dataset to query against in the form of $PROJECT.$DATASET
:param use_date_sharded_tables: whether to read from date sharded audit log tables or time partitioned audit log
tables
Expand Down Expand Up @@ -213,6 +216,7 @@ def bigquery_audit_metadata_query_template(
AND timestamp < "{end_time}"
AND protopayload_auditlog.serviceName="bigquery.googleapis.com"
AND JSON_EXTRACT_SCALAR(protopayload_auditlog.metadataJson, "$.jobChange.job.jobStatus.jobState") = "DONE"
AND JSON_EXTRACT(protopayload_auditlog.metadataJson, "$.jobChange.job.jobStatus.errorResults") IS NULL
AND JSON_EXTRACT(protopayload_auditlog.metadataJson, "$.jobChange.job.jobConfig.queryConfig") IS NOT NULL;
"""

Expand Down
96 changes: 70 additions & 26 deletions metadata-ingestion/src/datahub/utilities/sql_lineage_parser_impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from typing import Dict, List, Set

from sqllineage.core.holders import Column, SQLLineageHolder
from sqllineage.exceptions import SQLLineageException

try:
import sqlparse
Expand All @@ -20,8 +21,10 @@

class SqlLineageSQLParserImpl:
_DATE_SWAP_TOKEN = "__d_a_t_e"
_HOUR_SWAP_TOKEN = "__h_o_u_r"
_TIMESTAMP_SWAP_TOKEN = "__t_i_m_e_s_t_a_m_p"
_DATA_SWAP_TOKEN = "__d_a_t_a"
_ADMIN_SWAP_TOKEN = "__a_d_m_i_n"
_MYVIEW_SQL_TABLE_NAME_TOKEN = "__my_view__.__sql_table_name__"
_MYVIEW_LOOKER_TOKEN = "my_view.SQL_TABLE_NAME"

Expand All @@ -32,18 +35,59 @@ def __init__(self, sql_query: str) -> None:
if "lateral flatten" in sql_query:
sql_query = sql_query[: sql_query.find("lateral flatten")]

# BigQuery can use # as a comment symbol and that may break the parsing
# if the comment starts without a space sign after #
# and the comment is written inside SQL DDL statement (e.g., after CREATE...AS)
sql_query = re.sub(r"#([^ ])", r"# \1", sql_query, flags=re.IGNORECASE)

# Wrap calls to field '<table_alias>.from' in ` so it would not be taken as a reserved keyword
sql_query = re.sub(r"(\w*\.from)", r"`\1`", sql_query, flags=re.IGNORECASE)

# Apply sqlparser formatting to get rid of comments and reindent keywords
anshbansal marked this conversation as resolved.
Show resolved Hide resolved
# which should remove some potential inconsistencies in parsing output
sql_query = sqlparse.format(
sql_query.strip(),
reindent_aligned=True,
strip_comments=True,
)

# SqlLineageParser does not allow table/view names not being wrapped in quotes or backticks
# Add ` signs before and after supposed object name that comes right after reserved word FROM
vgaidass marked this conversation as resolved.
Show resolved Hide resolved
# note 1: this excludes date/time/datetime extract functions like EXTRACT DATE FROM...
# note 2: this includes adding ` signs to CTE aliases
sql_query = re.sub(
r"(?<!day\s)(?<!(date|time|hour|week|year)\s)(?<!month\s)(?<!(second|minute)\s)(?<!quarter\s)(?<!\.)(from\s)([^`\s()]+)",
vgaidass marked this conversation as resolved.
Show resolved Hide resolved
r"\3`\4`",
sql_query,
flags=re.IGNORECASE,
)

# Add ` signs before and after table/view name at the beginning of SQL DDL statement (e.g. CREATE/CREATE OR REPLACE...AS)
sql_query = re.sub(
r"(create.*\s)(table\s|view\s)([^`\s()]+)(?=\sas)",
r"\1\2`\3`",
sql_query,
flags=re.IGNORECASE,
vgaidass marked this conversation as resolved.
Show resolved Hide resolved
)

# Add ` signs before and after CTE alias name at WITH
sql_query = re.sub(
r"(with\s)([^`\s()]+)", r"\1`\2`", sql_query, flags=re.IGNORECASE
vgaidass marked this conversation as resolved.
Show resolved Hide resolved
)

# Replace reserved words that break SqlLineageParser
self.token_to_original: Dict[str, str] = {
self._DATE_SWAP_TOKEN: "date",
self._HOUR_SWAP_TOKEN: "hour",
self._TIMESTAMP_SWAP_TOKEN: "timestamp",
self._DATA_SWAP_TOKEN: "data",
self._ADMIN_SWAP_TOKEN: "admin",
}
for replacement, original in self.token_to_original.items():
sql_query = re.sub(
rf"(\b{original}\b)", rf"{replacement}", sql_query, flags=re.IGNORECASE
)

sql_query = re.sub(r"#([^ ])", r"# \1", sql_query)

# SqlLineageParser lowercarese tablenames and we need to replace Looker specific token which should be uppercased
sql_query = re.sub(
rf"(\${{{self._MYVIEW_LOOKER_TOKEN}}})",
Expand All @@ -57,36 +101,36 @@ def __init__(self, sql_query: str) -> None:
# Replace lookml templates with the variable otherwise sqlparse can't parse ${
sql_query = re.sub(r"(\${)(.+)(})", r"\2", sql_query)
if sql_query != original_sql_query:
logger.debug(f"rewrote original query {original_sql_query} as {sql_query}")
logger.debug(f"Rewrote original query {original_sql_query} as {sql_query}")

self._sql = sql_query

self._stmt = [
s
for s in sqlparse.parse(
# first apply sqlparser formatting just to get rid of comments, which cause
# inconsistencies in parsing output
sqlparse.format(
self._sql.strip(),
strip_comments=True,
use_space_around_operators=True,
),
)
if s.token_first(skip_cm=True)
]
try:
self._stmt = [
s
for s in sqlparse.parse(
sqlparse.format(
self._sql.strip(),
use_space_around_operators=True,
),
)
if s.token_first(skip_cm=True)
]

with unittest.mock.patch(
"sqllineage.core.handlers.source.SourceHandler.end_of_query_cleanup",
datahub.utilities.sqllineage_patch.end_of_query_cleanup_patch,
):
with unittest.mock.patch(
"sqllineage.core.holders.SubQueryLineageHolder.add_column_lineage",
datahub.utilities.sqllineage_patch.add_column_lineage_patch,
"sqllineage.core.handlers.source.SourceHandler.end_of_query_cleanup",
datahub.utilities.sqllineage_patch.end_of_query_cleanup_patch,
):
self._stmt_holders = [
LineageAnalyzer().analyze(stmt) for stmt in self._stmt
]
self._sql_holder = SQLLineageHolder.of(*self._stmt_holders)
with unittest.mock.patch(
"sqllineage.core.holders.SubQueryLineageHolder.add_column_lineage",
datahub.utilities.sqllineage_patch.add_column_lineage_patch,
):
self._stmt_holders = [
LineageAnalyzer().analyze(stmt) for stmt in self._stmt
]
self._sql_holder = SQLLineageHolder.of(*self._stmt_holders)
except SQLLineageException as e:
logger.error(f"SQL lineage analyzer error '{e}' for query: '{self._sql}")

def get_tables(self) -> List[str]:
result: List[str] = list()
Expand Down
182 changes: 180 additions & 2 deletions metadata-ingestion/tests/unit/test_utilities.py
Original file line number Diff line number Diff line change
Expand Up @@ -391,10 +391,188 @@ def test_with_keyword_data():
SELECT
*,
'foo' AS bar
FROM `project.example_dataset.another_example_table_vw`
FROM `project.example_dataset.example_table`
)
SELECT * FROM data
"""
)

assert parser.get_tables() == ["project.example_dataset.another_example_table_vw"]
assert parser.get_tables() == ["project.example_dataset.example_table"]


def test_with_keyword_admin():
parser = SqlLineageSQLParser(
sql_query="""
WITH admin AS (
SELECT *
FROM `project.example_dataset.example_table`
)
SELECT * FROM admin
"""
)

assert parser.get_tables() == ["project.example_dataset.example_table"]


def test_sqllineage_sql_parser_create_or_replace_table_name_not_wrapped_in_backticks():
parser = SqlLineageSQLParser(
sql_query="""
CREATE OR REPLACE TABLE project.dataset.test_table AS
WITH cte AS (
SELECT
*,
EXTRACT(MICROSECOND FROM time_field) AS col_1,
EXTRACT(MILLISECOND FROM time_field) AS col_2,
EXTRACT(SECOND FROM time_field) AS col_3,
EXTRACT(MINUTE FROM time_field) AS col_4,
EXTRACT(HOUR FROM time_field) AS col_5,
EXTRACT(DAYOFWEEK FROM time_field) AS col_6,
EXTRACT(DAY FROM time_field) AS col_7,
EXTRACT(DAYOFYEAR FROM time_field) AS col_8,
EXTRACT(WEEK FROM time_field) AS col_9,
EXTRACT(WEEK FROM time_field) AS col_10,
EXTRACT(ISOWEEK FROM time_field) AS col_11,
EXTRACT(MONTH FROM time_field) AS col_12,
EXTRACT(QUARTER FROM time_field) AS col_13,
EXTRACT(YEAR FROM time_field) AS col_14,
EXTRACT(ISOYEAR FROM time_field) AS col_15,
EXTRACT(DATE FROM time_field) AS col_16,
EXTRACT(TIME FROM time_field) AS col_17
FROM project.dataset.src_table_a
)
SELECT * FROM cte
UNION
SELECT *
FROM project.dataset.src_table_b
UNION
SELECT * FROM `project.dataset.src_table_c`
"""
)

assert parser.get_tables() == [
"project.dataset.src_table_a",
"project.dataset.src_table_b",
"project.dataset.src_table_c",
]


def test_sqllineage_sql_parser_create_or_replace_view_name_not_wrapped_in_backticks():
parser = SqlLineageSQLParser(
sql_query="""
CREATE OR REPLACE VIEW project.dataset.test_view AS
WITH cte AS (
SELECT
*,
'foo' AS bar
FROM project.dataset.src_table_a
)
SELECT * FROM cte
UNION
SELECT *
FROM project.dataset.src_table_b
UNION
SELECT * FROM `project.dataset.src_table_c`
"""
)

assert parser.get_tables() == [
"project.dataset.src_table_a",
"project.dataset.src_table_b",
"project.dataset.src_table_c",
]


def test_sqllineage_sql_parser_create_table_name_not_wrapped_in_backticks():
parser = SqlLineageSQLParser(
sql_query="""
CREATE TABLE project.dataset.test_table AS
WITH cte AS (
SELECT
*,
'foo' AS bar
FROM project.dataset.src_table_a
)
SELECT * FROM cte
UNION
SELECT *
FROM project.dataset.src_table_b
UNION
SELECT * FROM `project.dataset.src_table_c`
"""
)

assert parser.get_tables() == [
"project.dataset.src_table_a",
"project.dataset.src_table_b",
"project.dataset.src_table_c",
]


def test_sqllineage_sql_parser_create_view_name_not_wrapped_in_backticks():
parser = SqlLineageSQLParser(
sql_query="""
CREATE VIEW project.dataset.test_view AS
WITH cte AS (
SELECT
*,
'foo' AS bar
FROM project.dataset.src_table_a
)
SELECT * FROM cte
UNION
SELECT *
FROM project.dataset.src_table_b
UNION
SELECT * FROM `project.dataset.src_table_c`
"""
)

assert parser.get_tables() == [
"project.dataset.src_table_a",
"project.dataset.src_table_b",
"project.dataset.src_table_c",
]


def test_sqllineage_sql_parser_from_as_column_name_is_escaped():
parser = SqlLineageSQLParser(
sql_query="""
CREATE TABLE project.dataset.test_table AS
SELECT x.from AS col
FROM project.dataset.src_table AS x
"""
)

assert parser.get_tables() == [
"project.dataset.src_table",
]


def test_sqllineage_sql_parser_cte_alias_near_with_keyword_is_escaped():
parser = SqlLineageSQLParser(
sql_query="""
create or replace view `project.dataset.test_view` as
with map as (
select col_1
col_2
from `project.dataset.src_table_a` a
join `project.dataset.src_table_b` b
on a.col_1 = b.col_2
),
cte_2 as (
select * from `project.dataset.src_table_c`
)
select * from map
union all
select * from cte_2
"""
)

# results would (somehow) also contain "cte_2" which should be considered a subquery rather than a table
assert set(
[
"project.dataset.src_table_a",
"project.dataset.src_table_b",
"project.dataset.src_table_c",
]
).issubset(parser.get_tables())