Skip to content

Commit

Permalink
fix(ingestion/tableau): Fix tableau custom sql lineage gap (#10359)
Browse files Browse the repository at this point in the history
  • Loading branch information
shubhamjagtap639 committed May 7, 2024
1 parent 360445e commit ddb38e7
Show file tree
Hide file tree
Showing 3 changed files with 80 additions and 37 deletions.
7 changes: 5 additions & 2 deletions metadata-ingestion/src/datahub/ingestion/source/tableau.py
Original file line number Diff line number Diff line change
Expand Up @@ -1693,7 +1693,7 @@ def parse_custom_sql(
) -> Optional["SqlParsingResult"]:
database_info = datasource.get(c.DATABASE) or {
c.NAME: c.UNKNOWN.lower(),
c.CONNECTION_TYPE: "databricks",
c.CONNECTION_TYPE: datasource.get(c.CONNECTION_TYPE),
}

if (
Expand All @@ -1703,7 +1703,10 @@ def parse_custom_sql(
logger.debug(f"datasource {datasource_urn} is not created from custom sql")
return None

if c.NAME not in database_info or c.CONNECTION_TYPE not in database_info:
if (
database_info.get(c.NAME) is None
or database_info.get(c.CONNECTION_TYPE) is None
):
logger.debug(
f"database information is missing from datasource {datasource_urn}"
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,7 @@ class MetadataQueryException(Exception):
totalCount
}
}
connectionType
database{
name
connectionType
Expand Down Expand Up @@ -827,6 +828,7 @@ def get_unique_custom_sql(custom_sql_list: List[dict]) -> List[dict]:
# are missing from api result.
"isUnsupportedCustomSql": True if not custom_sql.get("tables") else False,
"query": custom_sql.get("query"),
"connectionType": custom_sql.get("connectionType"),
"columns": custom_sql.get("columns"),
"tables": custom_sql.get("tables"),
"database": custom_sql.get("database"),
Expand Down
108 changes: 73 additions & 35 deletions metadata-ingestion/tests/integration/tableau/test_tableau_ingest.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
)

from datahub.configuration.source_common import DEFAULT_ENV
from datahub.emitter.mce_builder import make_schema_field_urn
from datahub.ingestion.run.pipeline import Pipeline, PipelineContext
from datahub.ingestion.source.tableau import TableauConfig, TableauSource
from datahub.ingestion.source.tableau_common import (
Expand All @@ -24,10 +25,12 @@
)
from datahub.metadata.com.linkedin.pegasus2avro.dataset import (
DatasetLineageType,
FineGrainedLineage,
FineGrainedLineageDownstreamType,
FineGrainedLineageUpstreamType,
UpstreamLineage,
)
from datahub.metadata.schema_classes import MetadataChangeProposalClass, UpstreamClass
from datahub.sql_parsing.sqlglot_lineage import SqlParsingResult
from tests.test_helpers import mce_helpers, test_connection_helpers
from tests.test_helpers.state_helpers import (
get_current_checkpoint_from_pipeline,
Expand Down Expand Up @@ -805,55 +808,90 @@ def test_tableau_signout_timeout(pytestconfig, tmp_path, mock_datahub_graph):
)


def test_tableau_unsupported_csql(mock_datahub_graph):
def test_tableau_unsupported_csql():
context = PipelineContext(run_id="0", pipeline_name="test_tableau")
context.graph = mock_datahub_graph
config = TableauConfig.parse_obj(config_source_default.copy())
config_dict = config_source_default.copy()
del config_dict["stateful_ingestion"]
config = TableauConfig.parse_obj(config_dict)
config.extract_lineage_from_unsupported_custom_sql_queries = True
config.lineage_overrides = TableauLineageOverrides(
database_override_map={"production database": "prod"}
)

with mock.patch(
"datahub.ingestion.source.tableau.create_lineage_sql_parsed_result",
return_value=SqlParsingResult(
in_tables=[
"urn:li:dataset:(urn:li:dataPlatform:bigquery,my_bigquery_project.invent_dw.userdetail,PROD)"
],
out_tables=[],
column_lineage=None,
),
def test_lineage_metadata(
lineage, expected_entity_urn, expected_upstream_table, expected_cll
):
source = TableauSource(config=config, ctx=context)

lineage = source._create_lineage_from_unsupported_csql(
csql_urn="urn:li:dataset:(urn:li:dataPlatform:tableau,09988088-05ad-173c-a2f1-f33ba3a13d1a,PROD)",
csql={
"query": "SELECT user_id, source, user_source FROM (SELECT *, ROW_NUMBER() OVER (partition BY user_id ORDER BY __partition_day DESC) AS rank_ FROM invent_dw.UserDetail ) source_user WHERE rank_ = 1",
"isUnsupportedCustomSql": "true",
"database": {
"name": "my-bigquery-project",
"connectionType": "bigquery",
},
},
out_columns=[],
)

mcp = cast(MetadataChangeProposalClass, next(iter(lineage)).metadata)

assert mcp.aspect == UpstreamLineage(
upstreams=[
UpstreamClass(
dataset="urn:li:dataset:(urn:li:dataPlatform:bigquery,my_bigquery_project.invent_dw.userdetail,PROD)",
dataset=expected_upstream_table,
type=DatasetLineageType.TRANSFORMED,
)
],
fineGrainedLineages=[],
)
assert (
mcp.entityUrn
== "urn:li:dataset:(urn:li:dataPlatform:tableau,09988088-05ad-173c-a2f1-f33ba3a13d1a,PROD)"
fineGrainedLineages=[
FineGrainedLineage(
upstreamType=FineGrainedLineageUpstreamType.FIELD_SET,
upstreams=[
make_schema_field_urn(expected_upstream_table, upstream_column)
],
downstreamType=FineGrainedLineageDownstreamType.FIELD,
downstreams=[
make_schema_field_urn(expected_entity_urn, downstream_column)
],
)
for upstream_column, downstream_column in expected_cll.items()
],
)
assert mcp.entityUrn == expected_entity_urn

csql_urn = "urn:li:dataset:(urn:li:dataPlatform:tableau,09988088-05ad-173c-a2f1-f33ba3a13d1a,PROD)"
expected_upstream_table = "urn:li:dataset:(urn:li:dataPlatform:bigquery,my_bigquery_project.invent_dw.UserDetail,PROD)"
expected_cll = {
"user_id": "user_id",
"source": "source",
"user_source": "user_source",
}

source = TableauSource(config=config, ctx=context)

lineage = source._create_lineage_from_unsupported_csql(
csql_urn=csql_urn,
csql={
"query": "SELECT user_id, source, user_source FROM (SELECT *, ROW_NUMBER() OVER (partition BY user_id ORDER BY __partition_day DESC) AS rank_ FROM invent_dw.UserDetail ) source_user WHERE rank_ = 1",
"isUnsupportedCustomSql": "true",
"connectionType": "bigquery",
"database": {
"name": "my_bigquery_project",
"connectionType": "bigquery",
},
},
out_columns=[],
)
test_lineage_metadata(
lineage=lineage,
expected_entity_urn=csql_urn,
expected_upstream_table=expected_upstream_table,
expected_cll=expected_cll,
)

# With database as None
lineage = source._create_lineage_from_unsupported_csql(
csql_urn=csql_urn,
csql={
"query": "SELECT user_id, source, user_source FROM (SELECT *, ROW_NUMBER() OVER (partition BY user_id ORDER BY __partition_day DESC) AS rank_ FROM my_bigquery_project.invent_dw.UserDetail ) source_user WHERE rank_ = 1",
"isUnsupportedCustomSql": "true",
"connectionType": "bigquery",
"database": None,
},
out_columns=[],
)
test_lineage_metadata(
lineage=lineage,
expected_entity_urn=csql_urn,
expected_upstream_table=expected_upstream_table,
expected_cll=expected_cll,
)


@freeze_time(FROZEN_TIME)
Expand Down

0 comments on commit ddb38e7

Please sign in to comment.