From ddb38e7448d8f091d86c8f7711367c885f4bfdde Mon Sep 17 00:00:00 2001 From: Shubham Jagtap <132359390+shubhamjagtap639@users.noreply.github.com> Date: Tue, 7 May 2024 13:47:56 +0530 Subject: [PATCH] fix(ingestion/tableau): Fix tableau custom sql lineage gap (#10359) --- .../src/datahub/ingestion/source/tableau.py | 7 +- .../ingestion/source/tableau_common.py | 2 + .../tableau/test_tableau_ingest.py | 108 ++++++++++++------ 3 files changed, 80 insertions(+), 37 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/tableau.py b/metadata-ingestion/src/datahub/ingestion/source/tableau.py index 23a75745698d9..d6f63ab385f52 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/tableau.py +++ b/metadata-ingestion/src/datahub/ingestion/source/tableau.py @@ -1693,7 +1693,7 @@ def parse_custom_sql( ) -> Optional["SqlParsingResult"]: database_info = datasource.get(c.DATABASE) or { c.NAME: c.UNKNOWN.lower(), - c.CONNECTION_TYPE: "databricks", + c.CONNECTION_TYPE: datasource.get(c.CONNECTION_TYPE), } if ( @@ -1703,7 +1703,10 @@ def parse_custom_sql( logger.debug(f"datasource {datasource_urn} is not created from custom sql") return None - if c.NAME not in database_info or c.CONNECTION_TYPE not in database_info: + if ( + database_info.get(c.NAME) is None + or database_info.get(c.CONNECTION_TYPE) is None + ): logger.debug( f"database information is missing from datasource {datasource_urn}" ) diff --git a/metadata-ingestion/src/datahub/ingestion/source/tableau_common.py b/metadata-ingestion/src/datahub/ingestion/source/tableau_common.py index 98536472c5f61..fcfa434e00fee 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/tableau_common.py +++ b/metadata-ingestion/src/datahub/ingestion/source/tableau_common.py @@ -324,6 +324,7 @@ class MetadataQueryException(Exception): totalCount } } + connectionType database{ name connectionType @@ -827,6 +828,7 @@ def get_unique_custom_sql(custom_sql_list: List[dict]) -> List[dict]: # are missing from api result. "isUnsupportedCustomSql": True if not custom_sql.get("tables") else False, "query": custom_sql.get("query"), + "connectionType": custom_sql.get("connectionType"), "columns": custom_sql.get("columns"), "tables": custom_sql.get("tables"), "database": custom_sql.get("database"), diff --git a/metadata-ingestion/tests/integration/tableau/test_tableau_ingest.py b/metadata-ingestion/tests/integration/tableau/test_tableau_ingest.py index cccf79ccbd8e0..2b122897a333f 100644 --- a/metadata-ingestion/tests/integration/tableau/test_tableau_ingest.py +++ b/metadata-ingestion/tests/integration/tableau/test_tableau_ingest.py @@ -16,6 +16,7 @@ ) from datahub.configuration.source_common import DEFAULT_ENV +from datahub.emitter.mce_builder import make_schema_field_urn from datahub.ingestion.run.pipeline import Pipeline, PipelineContext from datahub.ingestion.source.tableau import TableauConfig, TableauSource from datahub.ingestion.source.tableau_common import ( @@ -24,10 +25,12 @@ ) from datahub.metadata.com.linkedin.pegasus2avro.dataset import ( DatasetLineageType, + FineGrainedLineage, + FineGrainedLineageDownstreamType, + FineGrainedLineageUpstreamType, UpstreamLineage, ) from datahub.metadata.schema_classes import MetadataChangeProposalClass, UpstreamClass -from datahub.sql_parsing.sqlglot_lineage import SqlParsingResult from tests.test_helpers import mce_helpers, test_connection_helpers from tests.test_helpers.state_helpers import ( get_current_checkpoint_from_pipeline, @@ -805,55 +808,90 @@ def test_tableau_signout_timeout(pytestconfig, tmp_path, mock_datahub_graph): ) -def test_tableau_unsupported_csql(mock_datahub_graph): +def test_tableau_unsupported_csql(): context = PipelineContext(run_id="0", pipeline_name="test_tableau") - context.graph = mock_datahub_graph - config = TableauConfig.parse_obj(config_source_default.copy()) + config_dict = config_source_default.copy() + del config_dict["stateful_ingestion"] + config = TableauConfig.parse_obj(config_dict) config.extract_lineage_from_unsupported_custom_sql_queries = True config.lineage_overrides = TableauLineageOverrides( database_override_map={"production database": "prod"} ) - with mock.patch( - "datahub.ingestion.source.tableau.create_lineage_sql_parsed_result", - return_value=SqlParsingResult( - in_tables=[ - "urn:li:dataset:(urn:li:dataPlatform:bigquery,my_bigquery_project.invent_dw.userdetail,PROD)" - ], - out_tables=[], - column_lineage=None, - ), + def test_lineage_metadata( + lineage, expected_entity_urn, expected_upstream_table, expected_cll ): - source = TableauSource(config=config, ctx=context) - - lineage = source._create_lineage_from_unsupported_csql( - csql_urn="urn:li:dataset:(urn:li:dataPlatform:tableau,09988088-05ad-173c-a2f1-f33ba3a13d1a,PROD)", - csql={ - "query": "SELECT user_id, source, user_source FROM (SELECT *, ROW_NUMBER() OVER (partition BY user_id ORDER BY __partition_day DESC) AS rank_ FROM invent_dw.UserDetail ) source_user WHERE rank_ = 1", - "isUnsupportedCustomSql": "true", - "database": { - "name": "my-bigquery-project", - "connectionType": "bigquery", - }, - }, - out_columns=[], - ) - mcp = cast(MetadataChangeProposalClass, next(iter(lineage)).metadata) - assert mcp.aspect == UpstreamLineage( upstreams=[ UpstreamClass( - dataset="urn:li:dataset:(urn:li:dataPlatform:bigquery,my_bigquery_project.invent_dw.userdetail,PROD)", + dataset=expected_upstream_table, type=DatasetLineageType.TRANSFORMED, ) ], - fineGrainedLineages=[], - ) - assert ( - mcp.entityUrn - == "urn:li:dataset:(urn:li:dataPlatform:tableau,09988088-05ad-173c-a2f1-f33ba3a13d1a,PROD)" + fineGrainedLineages=[ + FineGrainedLineage( + upstreamType=FineGrainedLineageUpstreamType.FIELD_SET, + upstreams=[ + make_schema_field_urn(expected_upstream_table, upstream_column) + ], + downstreamType=FineGrainedLineageDownstreamType.FIELD, + downstreams=[ + make_schema_field_urn(expected_entity_urn, downstream_column) + ], + ) + for upstream_column, downstream_column in expected_cll.items() + ], ) + assert mcp.entityUrn == expected_entity_urn + + csql_urn = "urn:li:dataset:(urn:li:dataPlatform:tableau,09988088-05ad-173c-a2f1-f33ba3a13d1a,PROD)" + expected_upstream_table = "urn:li:dataset:(urn:li:dataPlatform:bigquery,my_bigquery_project.invent_dw.UserDetail,PROD)" + expected_cll = { + "user_id": "user_id", + "source": "source", + "user_source": "user_source", + } + + source = TableauSource(config=config, ctx=context) + + lineage = source._create_lineage_from_unsupported_csql( + csql_urn=csql_urn, + csql={ + "query": "SELECT user_id, source, user_source FROM (SELECT *, ROW_NUMBER() OVER (partition BY user_id ORDER BY __partition_day DESC) AS rank_ FROM invent_dw.UserDetail ) source_user WHERE rank_ = 1", + "isUnsupportedCustomSql": "true", + "connectionType": "bigquery", + "database": { + "name": "my_bigquery_project", + "connectionType": "bigquery", + }, + }, + out_columns=[], + ) + test_lineage_metadata( + lineage=lineage, + expected_entity_urn=csql_urn, + expected_upstream_table=expected_upstream_table, + expected_cll=expected_cll, + ) + + # With database as None + lineage = source._create_lineage_from_unsupported_csql( + csql_urn=csql_urn, + csql={ + "query": "SELECT user_id, source, user_source FROM (SELECT *, ROW_NUMBER() OVER (partition BY user_id ORDER BY __partition_day DESC) AS rank_ FROM my_bigquery_project.invent_dw.UserDetail ) source_user WHERE rank_ = 1", + "isUnsupportedCustomSql": "true", + "connectionType": "bigquery", + "database": None, + }, + out_columns=[], + ) + test_lineage_metadata( + lineage=lineage, + expected_entity_urn=csql_urn, + expected_upstream_table=expected_upstream_table, + expected_cll=expected_cll, + ) @freeze_time(FROZEN_TIME)