Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(ingestion/powerbi): fix for databricks lineage m-query pattern #11462

Merged
merged 14 commits into from
Oct 7, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -44,3 +44,12 @@ class DataAccessFunctionDetail:
arg_list: Tree
data_access_function_name: str
identifier_accessor: Optional[IdentifierAccessor]


@dataclass
class ReferencedTable:
warehouse: str
catalog: Optional[str]
database: str
schema: str
table: str
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,15 @@ def get_upstream_tables(
) -> List[resolver.Lineage]:

if table.expression is None:
logger.debug(f"Expression is none for table {table.full_name}")
logger.debug(f"There is no M-Query expression in table {table.full_name}")
return []

parameters = parameters or {}

logger.debug(
f"Processing {table.full_name} m-query expression for lineage extraction. Expression = {table.expression}"
)

try:
parse_tree: Tree = _parse_expression(table.expression)

Expand All @@ -73,23 +77,31 @@ def get_upstream_tables(
if valid is False:
assert message is not None
logger.debug(f"Validation failed: {message}")
reporter.report_warning(table.full_name, message)
reporter.info(
title="Unsupported M-Query",
message="DataAccess function is not present in M-Query expression",
context=f"table-full-name={table.full_name}, expression={table.expression}, message={message}",
)
return []
except (
BaseException
) as e: # TODO: Debug why BaseException is needed here and below.
if isinstance(e, lark.exceptions.UnexpectedCharacters):
message = "Unsupported m-query expression"
title = "Unexpected Character Found"
else:
message = "Failed to parse m-query expression"
title = "Unknown Parsing Error"

reporter.report_warning(table.full_name, message)
logger.info(f"{message} for table {table.full_name}")
logger.debug(f"Stack trace for {table.full_name}:", exc_info=e)
reporter.warning(
title=title,
message="Unknown parsing error",
context=f"table-full-name={table.full_name}, expression={table.expression}",
exc=e,
)
return []

lineage: List[resolver.Lineage] = []
try:
return resolver.MQueryResolver(
lineage = resolver.MQueryResolver(
table=table,
parse_tree=parse_tree,
reporter=reporter,
Expand All @@ -101,10 +113,13 @@ def get_upstream_tables(
)

except BaseException as e:
reporter.report_warning(table.full_name, "Failed to process m-query expression")
logger.info(
f"Failed to process m-query expression for table {table.full_name}: {str(e)}"
reporter.warning(
title="Unknown M-Query Pattern",
message="Encountered a unknown M-Query Expression",
context=f"table-full-name={table.full_name}, expression={table.expression}, message={e}",
exc=e,
)

logger.debug(f"Stack trace for {table.full_name}:", exc_info=e)

return []
return lineage
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
AbstractIdentifierAccessor,
DataAccessFunctionDetail,
IdentifierAccessor,
ReferencedTable,
)
from datahub.ingestion.source.powerbi.rest_api_wrapper.data_classes import Table
from datahub.sql_parsing.sqlglot_lineage import ColumnLineageInfo, SqlParsingResult
Expand Down Expand Up @@ -157,15 +158,46 @@ def get_db_detail_from_argument(
return arguments[0], arguments[1]

@staticmethod
def get_tokens(
def create_reference_table(
arg_list: Tree,
) -> List[str]:
table_detail: Dict[str, str],
) -> Optional[ReferencedTable]:

arguments: List[str] = tree_function.strip_char_from_list(
values=tree_function.remove_whitespaces_from_list(
tree_function.token_values(arg_list)
),
)
return arguments

logger.debug(f"Processing arguments {arguments}")

if (
len(arguments)
>= 4 # [0] is warehouse FQDN.
# [1] is endpoint, we are not using it.
# [2] is "Catalog" key
# [3] is catalog's value
):
return ReferencedTable(
warehouse=arguments[0],
catalog=arguments[3],
# As per my observation, database and catalog names are same in M-Query
database=table_detail["Database"]
if table_detail.get("Database")
else arguments[3],
schema=table_detail["Schema"],
table=table_detail.get("Table") or table_detail["View"],
)
elif len(arguments) == 2:
return ReferencedTable(
warehouse=arguments[0],
database=table_detail["Database"],
schema=table_detail["Schema"],
table=table_detail.get("Table") or table_detail["View"],
catalog=None,
)

return None

def parse_custom_sql(
self, query: str, server: str, database: Optional[str], schema: Optional[str]
Expand Down Expand Up @@ -774,32 +806,22 @@ def create_lineage(
class DatabrickDataPlatformTableCreator(AbstractDataPlatformTableCreator):
def form_qualified_table_name(
self,
value_dict: Dict[Any, Any],
catalog_name: str,
table_reference: ReferencedTable,
data_platform_pair: DataPlatformPair,
server: str,
) -> str:
# database and catalog names are same in M-Query
db_name: str = (
catalog_name if "Database" not in value_dict else value_dict["Database"]
)

schema_name: str = value_dict["Schema"]

table_name: str = value_dict["Table"]

platform_detail: PlatformDetail = (
self.platform_instance_resolver.get_platform_instance(
PowerBIPlatformDetail(
data_platform_pair=data_platform_pair,
data_platform_server=server,
data_platform_server=table_reference.warehouse,
)
)
)

metastore: Optional[str] = None

qualified_table_name: str = f"{db_name}.{schema_name}.{table_name}"
qualified_table_name: str = f"{table_reference.database}.{table_reference.schema}.{table_reference.table}"

if isinstance(platform_detail, DataBricksPlatformDetail):
metastore = platform_detail.metastore
Expand All @@ -815,23 +837,23 @@ def create_lineage(
logger.debug(
f"Processing Databrick data-access function detail {data_access_func_detail}"
)
value_dict: Dict[str, str] = {}
table_detail: Dict[str, str] = {}
temp_accessor: Optional[
Union[IdentifierAccessor, AbstractIdentifierAccessor]
] = data_access_func_detail.identifier_accessor

while temp_accessor:
if isinstance(temp_accessor, IdentifierAccessor):
# Condition to handle databricks M-query pattern where table, schema and database all are present in
# same invoke statement
# the same invoke statement
if all(
element in temp_accessor.items
for element in ["Item", "Schema", "Catalog"]
):
value_dict["Schema"] = temp_accessor.items["Schema"]
value_dict["Table"] = temp_accessor.items["Item"]
table_detail["Schema"] = temp_accessor.items["Schema"]
table_detail["Table"] = temp_accessor.items["Item"]
else:
value_dict[temp_accessor.items["Kind"]] = temp_accessor.items[
table_detail[temp_accessor.items["Kind"]] = temp_accessor.items[
"Name"
]

Expand All @@ -845,42 +867,36 @@ def create_lineage(
)
return Lineage.empty()

arguments = self.get_tokens(data_access_func_detail.arg_list)
if len(arguments) < 4:
logger.info(
f"Databricks workspace and catalog information in arguments({arguments}). "
f"Skipping upstream table"
)
return Lineage.empty()

workspace_fqdn: str = arguments[0]
table_reference = self.create_reference_table(
arg_list=data_access_func_detail.arg_list,
table_detail=table_detail,
)

catalog_name: str = arguments[3]
if table_reference:
qualified_table_name: str = self.form_qualified_table_name(
table_reference=table_reference,
data_platform_pair=self.get_platform_pair(),
)

qualified_table_name: str = self.form_qualified_table_name(
value_dict=value_dict,
catalog_name=catalog_name,
data_platform_pair=self.get_platform_pair(),
server=workspace_fqdn,
)
urn = urn_creator(
config=self.config,
platform_instance_resolver=self.platform_instance_resolver,
data_platform_pair=self.get_platform_pair(),
server=table_reference.warehouse,
qualified_table_name=qualified_table_name,
)

urn = urn_creator(
config=self.config,
platform_instance_resolver=self.platform_instance_resolver,
data_platform_pair=self.get_platform_pair(),
server=workspace_fqdn,
qualified_table_name=qualified_table_name,
)
return Lineage(
upstreams=[
DataPlatformTable(
data_platform_pair=self.get_platform_pair(),
urn=urn,
)
],
column_lineage=[],
)

return Lineage(
upstreams=[
DataPlatformTable(
data_platform_pair=self.get_platform_pair(),
urn=urn,
)
],
column_lineage=[],
)
return Lineage.empty()

def get_platform_pair(self) -> DataPlatformPair:
return SupportedDataPlatform.DATABRICK_SQL.value
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ def make_function_name(tree: Tree) -> str:

def get_all_function_name(tree: Tree) -> List[str]:
"""
Returns all function name present in input tree
Returns all function name present in an input tree
:param tree: Input lexical tree
:return: list of function name
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ def validate_parse_tree(
:param tree: tree to validate as per functions supported by m_parser module
:param native_query_enabled: Whether user want to extract lineage from native query
:return: first argument is False if validation is failed and second argument would contain the error message.
in-case of valid tree the first argument is True and second argument would be None.
in the case of valid tree, the first argument is True and the second argument would be None.
"""
functions: List[str] = tree_function.get_all_function_name(tree)
if len(functions) == 0:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -452,7 +452,7 @@ def to_datahub_dataset(
]
),
)
# normally the person who configure the dataset will be the most accurate person for ownership
# normally, the person who configures the dataset will be the most accurate person for ownership
if (
self.__config.extract_ownership
and self.__config.ownership.dataset_configured_by_as_owner
Expand Down Expand Up @@ -1334,6 +1334,21 @@ def validate_dataset_type_mapping(self):
def extract_independent_datasets(
self, workspace: powerbi_data_classes.Workspace
) -> Iterable[MetadataWorkUnit]:
if self.source_config.extract_independent_datasets is False:
if workspace.independent_datasets:
self.reporter.info(
title="Skipped Independent Dataset",
message="Some datasets are not used in any visualizations. To ingest them, enable the `extract_independent_datasets` flag",
context=",".join(
[
dataset.name
for dataset in workspace.independent_datasets
if dataset.name
]
),
)
return

for dataset in workspace.independent_datasets:
yield from auto_workunit(
stream=self.mapper.to_datahub_dataset(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -422,11 +422,6 @@ def _fill_metadata_from_scan_result(
return workspaces

def _fill_independent_datasets(self, workspace: Workspace) -> None:
if self.__config.extract_independent_datasets is False:
logger.info(
"Skipping independent datasets retrieval as extract_independent_datasets is set to false"
)
return

reachable_datasets: List[str] = []
# Find out reachable datasets
Expand Down
Loading
Loading