Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(tableau): ability to force extraction of table/column level linage from SQL queries #9838

Merged
merged 52 commits into from
Mar 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
52 commits
Select commit Hold shift + click to select a range
dbed477
feat(tableau): ability to force extraction of table/column level lina…
Feb 14, 2024
231b1d9
style(comments): added comments
Feb 14, 2024
a7f9601
Merge branch 'master' into master
alexs-101 Feb 14, 2024
5f172be
style(comments): fixed comments
Feb 14, 2024
50c8d1a
Merge branch 'master' into master
alexs-101 Feb 14, 2024
f18ec07
Merge branch 'master' into master
alexs-101 Feb 14, 2024
8dd7780
fix(pr): fixed logic related to schema_aware option
Feb 15, 2024
c972c03
Merge branch 'master' into master
alexs-101 Feb 15, 2024
c340015
Merge branch 'master' into master
alexs-101 Feb 15, 2024
076ce37
fix(pr): changed the logic disabling schema awareness during parsing
Feb 16, 2024
2f0eb02
fix(pr): moved inferring of tables schemas outside of SQL parsing logic
Feb 16, 2024
b463ec7
Merge branch 'master' into master
alexs-101 Feb 16, 2024
7abe4bc
fix(pr): remove patched version of TSQL dialect
Feb 17, 2024
982b0e1
fix(pr) restored accidentally removed golden test file
Feb 17, 2024
d1a1084
Merge remote-tracking branch 'upstream/master'
Feb 27, 2024
f1a05d9
fix(pr) updated golden file metadata-ingestion/tests/integration/tabl…
Feb 27, 2024
1bf4cf2
Merge branch 'master' into master
alexs-101 Feb 27, 2024
ff88a0e
Merge branch 'master' into master
alexs-101 Feb 27, 2024
4a0dff8
Merge branch 'master' into master
alexs-101 Feb 28, 2024
97babd8
Merge branch 'master' into master
alexs-101 Feb 28, 2024
930d361
Merge branch 'master' into master
alexs-101 Feb 28, 2024
acafb08
Merge branch 'master' into master
alexs-101 Feb 28, 2024
a55cc7d
feat(search): search access controls (#9892)
david-leifker Feb 28, 2024
0129ed7
feat(ingest/sql-parser): add alias for mariadb (#9956)
hsheth2 Feb 29, 2024
c7dc3f6
docs(ingest/lookml): update known discrepancy list (#9941)
hsheth2 Feb 29, 2024
1d64b6b
chore(vulnerability): Bumped up versions for vulnerability fix (#9929)
pankajmahato-visa Feb 29, 2024
11d526c
Revert "chore(vulnerability): Bumped up versions for vulnerability fi…
RyanHolstien Feb 29, 2024
bc200a1
bump(kafka-setup): client version bump (#9962)
david-leifker Feb 29, 2024
4aa1dc3
feat(ingest): throw codegen error on duplicate class names (#9960)
hsheth2 Mar 1, 2024
5a516f8
feat(docker): respect pip mirrors with uv (#9963)
hsheth2 Mar 1, 2024
abd811b
Openlineage endpoint and Spark Lineage Beta Plugin (#9870)
treff7es Mar 1, 2024
45a77d0
Merge branch 'master' into master
alexs-101 Mar 1, 2024
bed50d6
style(comments): fixed comments
alexs-101 Mar 5, 2024
684e1cc
fix(ingest/json-schema): adding support descriptions for array (#9757)
AvaniSiddhapuraAPT Mar 1, 2024
e229b40
fix(ingest/redshift): fix bug in lineage v2 table renames (#9967)
hsheth2 Mar 4, 2024
84e28f2
feat(ingest): speed up to_obj() and validate() (#9969)
hsheth2 Mar 4, 2024
b1041de
feat(ingest): fix fspath lint error (#9976)
hsheth2 Mar 5, 2024
88dfe6e
docs: archive old version before 0.12.0 & fix broken links (#9957)
yoonhyejin Mar 5, 2024
2d51807
fix(ui/editor): arrows change field when editing description (#9949)
gaurav2733 Mar 5, 2024
6c8c3f6
feat(ui/policies): add filter for Active/Inactive/All on policy page …
gaurav2733 Mar 5, 2024
9be09d3
fix(pr) improved _clean_tableau_query_parameters and added tests
Mar 5, 2024
66868b4
fix(pr) rename disable_schema_awarenes_during_parsing_of_sql_queries …
Mar 5, 2024
e85ee7d
style(comments) remove obsolete comments
Mar 5, 2024
7b3126f
fix(pr) rollback workaround for buggy sqlglot T-SQL dialect
Mar 5, 2024
c4f0589
fix(pr) cleanups
Mar 5, 2024
97dd6cf
fix(pr) switch from casefold() to lower() for consistency
Mar 5, 2024
ed71506
Merge branch 'master' into master
alexs-101 Mar 5, 2024
96d53ac
Merge branch 'master' into master
alexs-101 Mar 5, 2024
7ee7db0
Merge branch 'master' into master
alexs-101 Mar 7, 2024
d6e169d
Merge branch 'master' into master
alexs-101 Mar 11, 2024
396b727
Apply suggestions from code review
hsheth2 Mar 18, 2024
438598f
Merge branch 'master' into master
hsheth2 Mar 21, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
278 changes: 226 additions & 52 deletions metadata-ingestion/src/datahub/ingestion/source/tableau.py

Large diffs are not rendered by default.

25 changes: 22 additions & 3 deletions metadata-ingestion/src/datahub/ingestion/source/tableau_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import logging
from dataclasses import dataclass
from functools import lru_cache
from typing import Dict, List, Optional, Tuple
from typing import Any, Dict, List, Optional, Tuple

from pydantic.fields import Field
from tableauserverclient import Server
Expand Down Expand Up @@ -762,8 +762,19 @@ def make_upstream_class(


def make_fine_grained_lineage_class(
parsed_result: Optional[SqlParsingResult], dataset_urn: str
parsed_result: Optional[SqlParsingResult],
dataset_urn: str,
out_columns: List[Dict[Any, Any]],
) -> List[FineGrainedLineage]:
# 1) fine grained lineage links are case sensitive
# 2) parsed out columns are always lower cased
# 3) corresponding Custom SQL output columns can be in any case lower/upper/mix
#
# we need a map between 2 and 3 that will be used during building column level linage links (see below)
out_columns_map = {
col.get(c.NAME, "").lower(): col.get(c.NAME, "") for col in out_columns
}

fine_grained_lineages: List[FineGrainedLineage] = []

if parsed_result is None:
Expand All @@ -775,7 +786,15 @@ def make_fine_grained_lineage_class(

for cll_info in cll:
downstream = (
[builder.make_schema_field_urn(dataset_urn, cll_info.downstream.column)]
[
builder.make_schema_field_urn(
dataset_urn,
out_columns_map.get(
cll_info.downstream.column.lower(),
cll_info.downstream.column,
),
)
]
if cll_info.downstream is not None
and cll_info.downstream.column is not None
else []
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
from typing import Dict, Set

from datahub.sql_parsing.sqlglot_lineage import SqlParsingResult, Urn


def transform_parsing_result_to_in_tables_schemas(
parsing_result: SqlParsingResult,
) -> Dict[Urn, Set[str]]:
table_urn_to_schema_map: Dict[str, Set[str]] = (
{it: set() for it in parsing_result.in_tables}
if parsing_result.in_tables
else {}
)

if parsing_result.column_lineage:
for cli in parsing_result.column_lineage:
for upstream in cli.upstreams:
if upstream.table in table_urn_to_schema_map:
table_urn_to_schema_map[upstream.table].add(upstream.column)
else:
table_urn_to_schema_map[upstream.table] = {upstream.column}

return table_urn_to_schema_map
Original file line number Diff line number Diff line change
Expand Up @@ -529,6 +529,9 @@ def _schema_aware_fuzzy_column_resolve(

# Parse the column name out of the node name.
# Sqlglot calls .sql(), so we have to do the inverse.
if node.name == "*":
continue

normalized_col = sqlglot.parse_one(node.name).this.name
if node.subfield:
normalized_col = f"{normalized_col}.{node.subfield}"
Expand Down Expand Up @@ -834,6 +837,7 @@ def _sqlglot_lineage_inner(
# Fetch schema info for the relevant tables.
table_name_urn_mapping: Dict[_TableName, str] = {}
table_name_schema_mapping: Dict[_TableName, SchemaInfo] = {}

for table in tables | modified:
# For select statements, qualification will be a no-op. For other statements, this
# is where the qualification actually happens.
Expand Down Expand Up @@ -1016,8 +1020,9 @@ def create_lineage_sql_parsed_result(
env: str,
default_schema: Optional[str] = None,
graph: Optional[DataHubGraph] = None,
schema_aware: bool = True,
) -> SqlParsingResult:
if graph:
if graph and schema_aware:
needs_close = False
schema_resolver = graph._make_schema_resolver(
platform=platform,
Expand Down
6 changes: 6 additions & 0 deletions metadata-ingestion/src/datahub/sql_parsing/sqlglot_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,11 @@ def _get_dialect_str(platform: str) -> str:
return "tsql"
elif platform == "athena":
return "trino"
# TODO: define SalesForce SOQL dialect
# Temporary workaround is to treat SOQL as databricks dialect
# At least it allows to parse simple SQL queries and built linage for them
elif platform == "salesforce":
return "databricks"
elif platform in {"mysql", "mariadb"}:
# In sqlglot v20+, MySQL is now case-sensitive by default, which is the
# default behavior on Linux. However, MySQL's default case sensitivity
Expand All @@ -31,6 +36,7 @@ def _get_dialect_str(platform: str) -> str:
def get_dialect(platform: DialectOrStr) -> sqlglot.Dialect:
if isinstance(platform, sqlglot.Dialect):
return platform

return sqlglot.Dialect.get_or_raise(_get_dialect_str(platform))


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42870,6 +42870,38 @@
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:bigquery,demo-custom-323403.bigquery_demo.order_items,PROD)",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
},
"systemMetadata": {
"lastObserved": 1638860400000,
"runId": "tableau-test",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:bigquery,demo-custom-323403.bigquery_demo.sellers,PROD)",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
},
"systemMetadata": {
"lastObserved": 1638860400000,
"runId": "tableau-test",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:external,sample - superstore%2C %28new%29.xls.orders,PROD)",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,8 @@ def test_tableau_cll_ingest(pytestconfig, tmp_path, mock_datahub_graph):
new_pipeline_config: Dict[Any, Any] = {
**config_source_default,
"extract_lineage_from_unsupported_custom_sql_queries": True,
"force_extraction_of_lineage_from_custom_sql_queries": False,
"sql_parsing_disable_schema_awareness": False,
"extract_column_level_lineage": True,
}

Expand Down Expand Up @@ -834,6 +836,7 @@ def test_tableau_unsupported_csql(mock_datahub_graph):
"connectionType": "bigquery",
},
},
out_columns=[],
)

mcp = cast(MetadataChangeProposalClass, next(iter(lineage)).metadata)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
from datahub.sql_parsing.sql_parsing_result_utils import (
transform_parsing_result_to_in_tables_schemas,
)
from datahub.sql_parsing.sqlglot_lineage import (
ColumnLineageInfo,
ColumnRef,
DownstreamColumnRef,
SqlParsingResult,
)


def test_transform_parsing_result_to_in_tables_schemas__empty_parsing_result():
parsing_result = SqlParsingResult(in_tables=[], out_tables=[], column_lineage=None)

in_tables_schema = transform_parsing_result_to_in_tables_schemas(parsing_result)
assert not in_tables_schema


def test_transform_parsing_result_to_in_tables_schemas__in_tables_only():
parsing_result = SqlParsingResult(
in_tables=["table_urn1", "table_urn2", "table_urn3"],
out_tables=[],
column_lineage=None,
)

in_tables_schema = transform_parsing_result_to_in_tables_schemas(parsing_result)
assert in_tables_schema == {
"table_urn1": set(),
"table_urn2": set(),
"table_urn3": set(),
}


def test_transform_parsing_result_to_in_tables_schemas__in_tables_and_column_linage():
parsing_result = SqlParsingResult(
in_tables=["table_urn1", "table_urn2", "table_urn3"],
out_tables=[],
column_lineage=[
ColumnLineageInfo(
downstream=DownstreamColumnRef(column="out_col1"),
upstreams=[
ColumnRef(table="table_urn1", column="col11"),
],
),
ColumnLineageInfo(
downstream=DownstreamColumnRef(column="out_col2"),
upstreams=[
ColumnRef(table="table_urn2", column="col21"),
ColumnRef(table="table_urn2", column="col22"),
],
),
ColumnLineageInfo(
downstream=DownstreamColumnRef(column="out_col3"),
upstreams=[
ColumnRef(table="table_urn1", column="col12"),
ColumnRef(table="table_urn2", column="col23"),
],
),
],
)

in_tables_schema = transform_parsing_result_to_in_tables_schemas(parsing_result)
assert in_tables_schema == {
"table_urn1": {"col11", "col12"},
"table_urn2": {"col21", "col22", "col23"},
"table_urn3": set(),
}
123 changes: 123 additions & 0 deletions metadata-ingestion/tests/unit/test_tableau_source.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
import pytest

from datahub.ingestion.source.tableau import TableauSource


def test_tableau_source_unescapes_lt():
res = TableauSource._clean_tableau_query_parameters(
"select * from t where c1 << 135"
)

assert res == "select * from t where c1 < 135"


def test_tableau_source_unescapes_gt():
res = TableauSource._clean_tableau_query_parameters(
"select * from t where c1 >> 135"
)

assert res == "select * from t where c1 > 135"


def test_tableau_source_unescapes_gte():
res = TableauSource._clean_tableau_query_parameters(
"select * from t where c1 >>= 135"
)

assert res == "select * from t where c1 >= 135"


def test_tableau_source_unescapeslgte():
res = TableauSource._clean_tableau_query_parameters(
"select * from t where c1 <<= 135"
)

assert res == "select * from t where c1 <= 135"


def test_tableau_source_doesnt_touch_not_escaped():
res = TableauSource._clean_tableau_query_parameters(
"select * from t where c1 < 135 and c2 > 15"
)

assert res == "select * from t where c1 < 135 and c2 > 15"


TABLEAU_PARAMS = [
"<Parameters.MyParam>",
"<Parameters.MyParam_1>",
"<Parameters.My Param _ 1>",
"<Parameters.My Param 1 !@\"',.#$%^:;&*()-_+={}|\\ /<>",
"<[Parameters].MyParam>",
"<[Parameters].MyParam_1>",
"<[Parameters].My Param _ 1>",
"<[Parameters].My Param 1 !@\"',.#$%^:;&*()-_+={}|\\ /<>",
"<Parameters.[MyParam]>",
"<Parameters.[MyParam_1]>",
"<Parameters.[My Param _ 1]>",
"<Parameters.[My Param 1 !@\"',.#$%^:;&*()-_+={}|\\ /<]>",
"<[Parameters].[MyParam]>",
"<[Parameters].[MyParam_1]>",
"<[Parameters].[My Param _ 1]>",
"<[Parameters].[My Param 1 !@\"',.#$%^:;&*()-_+={}|\\ /<]>",
"<Parameters.[My Param 1 !@\"',.#$%^:;&*()-_+={}|\\ /<>]>",
"<[Parameters].[My Param 1 !@\"',.#$%^:;&*()-_+={}|\\ /<>]>",
]


@pytest.mark.parametrize("p", TABLEAU_PARAMS)
def test_tableau_source_cleanups_tableau_parameters_in_equi_predicates(p):
assert (
TableauSource._clean_tableau_query_parameters(
f"select * from t where c1 = {p} and c2 = {p} and c3 = 7"
)
== "select * from t where c1 = 1 and c2 = 1 and c3 = 7"
)


@pytest.mark.parametrize("p", TABLEAU_PARAMS)
def test_tableau_source_cleanups_tableau_parameters_in_lt_gt_predicates(p):
assert (
TableauSource._clean_tableau_query_parameters(
f"select * from t where c1 << {p} and c2<<{p} and c3 >> {p} and c4>>{p} or {p} >> c1 and {p}>>c2 and {p} << c3 and {p}<<c4"
)
== "select * from t where c1 < 1 and c2<1 and c3 > 1 and c4>1 or 1 > c1 and 1>c2 and 1 < c3 and 1<c4"
)


@pytest.mark.parametrize("p", TABLEAU_PARAMS)
def test_tableau_source_cleanups_tableau_parameters_in_lte_gte_predicates(p):
assert (
TableauSource._clean_tableau_query_parameters(
f"select * from t where c1 <<= {p} and c2<<={p} and c3 >>= {p} and c4>>={p} or {p} >>= c1 and {p}>>=c2 and {p} <<= c3 and {p}<<=c4"
)
== "select * from t where c1 <= 1 and c2<=1 and c3 >= 1 and c4>=1 or 1 >= c1 and 1>=c2 and 1 <= c3 and 1<=c4"
)


@pytest.mark.parametrize("p", TABLEAU_PARAMS)
def test_tableau_source_cleanups_tableau_parameters_in_join_predicate(p):
assert (
TableauSource._clean_tableau_query_parameters(
f"select * from t1 inner join t2 on t1.id = t2.id and t2.c21 = {p} and t1.c11 = 123 + {p}"
)
== "select * from t1 inner join t2 on t1.id = t2.id and t2.c21 = 1 and t1.c11 = 123 + 1"
)


@pytest.mark.parametrize("p", TABLEAU_PARAMS)
def test_tableau_source_cleanups_tableau_parameters_in_complex_expressions(p):
assert (
TableauSource._clean_tableau_query_parameters(
f"select myudf1(c1, {p}, c2) / myudf2({p}) > ({p} + 3 * {p} * c5) * {p} - c4"
)
== "select myudf1(c1, 1, c2) / myudf2(1) > (1 + 3 * 1 * c5) * 1 - c4"
)


@pytest.mark.parametrize("p", TABLEAU_PARAMS)
def test_tableau_source_cleanups_tableau_parameters_in_udfs(p):
assert (
TableauSource._clean_tableau_query_parameters(f"select myudf({p}) from t")
== "select myudf(1) from t"
)
Loading