Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ingestion/powerbi): DatabricksMultiCloud native query support #11756

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
53 commits
Select commit Hold shift + click to select a range
11c8da1
remove ANSI escape sequence
sid-acryl Oct 30, 2024
96abb10
Merge branch 'master' into ing-755-databricks-multicloud-native-sql
sid-acryl Oct 30, 2024
73aab8f
Remove ANSI escape characters
sid-acryl Oct 30, 2024
08de42a
log message
sid-acryl Oct 30, 2024
a4af0ce
Trigger build
sid-acryl Oct 30, 2024
4e53d5d
Native sql support for DatabricksMultiCloud
sid-acryl Oct 30, 2024
4a5af75
Merge branch 'master' into ing-755-databricks-multicloud-native-sql
sid-acryl Oct 30, 2024
15d55e9
lint fix
sid-acryl Oct 30, 2024
774172c
Merge branch 'ing-755-databricks-multicloud-native-sql' of https://gi…
sid-acryl Oct 30, 2024
b2fd30b
Trigger build
sid-acryl Oct 30, 2024
fcb4bb5
Merge branch 'master' into ing-755-databricks-multicloud-native-sql
hsheth2 Oct 30, 2024
1b3de82
replace "" by "
sid-acryl Oct 30, 2024
16dc5ea
Merge branch 'ing-755-databricks-multicloud-native-sql' of https://gi…
sid-acryl Oct 30, 2024
369ccfc
merge conflixt
sid-acryl Oct 31, 2024
37e36df
multi function call support
sid-acryl Oct 31, 2024
8db54c4
Merge branch 'master' into ing-755-databricks-multicloud-native-sql
hsheth2 Oct 31, 2024
de5d993
fix partition exec
hsheth2 Oct 31, 2024
7bdd3cf
improve reporting
hsheth2 Nov 1, 2024
96897bb
tweak comments in partitioned executor
hsheth2 Nov 1, 2024
c37a361
Merge branch 'master' into ing-755-databricks-multicloud-native-sql
sid-acryl Nov 5, 2024
4a79c67
configurable m-query parse timeout
sid-acryl Nov 5, 2024
0a6e680
Merge branch 'ing-755-databricks-multicloud-native-sql' of https://gi…
sid-acryl Nov 5, 2024
4fbe7a5
fix each expression
sid-acryl Nov 6, 2024
c283e54
Merge branch 'master' into ing-755-databricks-multicloud-native-sql
sid-acryl Nov 6, 2024
94e0e4f
fix special characters
sid-acryl Nov 6, 2024
f40dd5a
Merge branch 'ing-755-databricks-multicloud-native-sql' of https://gi…
sid-acryl Nov 6, 2024
fa95918
improve report
sid-acryl Nov 6, 2024
6d52c2d
remove drop table statement from M-Query sql
sid-acryl Nov 6, 2024
ca6d06f
Add sql parsing error in report
sid-acryl Nov 7, 2024
6b81cbb
Merge branch 'master' into ing-755-databricks-multicloud-native-sql
sid-acryl Nov 7, 2024
f26a06e
fix for <class 'AttributeError'>: 'NoneType' object has no attribute …
sid-acryl Nov 7, 2024
b3be595
Merge branch 'ing-755-databricks-multicloud-native-sql' of https://gi…
sid-acryl Nov 7, 2024
df89502
Merge branch 'master' into ing-755-databricks-multicloud-native-sql
sid-acryl Nov 7, 2024
7e5c5ea
info to warning
sid-acryl Nov 7, 2024
aa8cce6
info to warning
sid-acryl Nov 7, 2024
2ef3456
Merge branch 'ing-755-databricks-multicloud-native-sql' of https://gi…
sid-acryl Nov 7, 2024
e4781fe
Merge branch 'master' into ing-755-databricks-multicloud-native-sql
anshbansal Nov 7, 2024
9927c76
grammar fix for empty string
sid-acryl Nov 7, 2024
5348a6a
fix for timeout and alias syntax in native query
sid-acryl Nov 8, 2024
165d940
Merge branch 'master' into ing-755-databricks-multicloud-native-sql
sid-acryl Nov 8, 2024
920f63c
trying hang fix
sid-acryl Nov 9, 2024
76ffd49
Merge branch 'ing-755-databricks-multicloud-native-sql' of https://gi…
sid-acryl Nov 9, 2024
edf6d7a
switch to thread
sid-acryl Nov 9, 2024
c8e4667
Merge branch 'master' into ing-755-databricks-multicloud-native-sql
sid-acryl Nov 9, 2024
413cb71
fix for lark infinite loop
sid-acryl Nov 11, 2024
af1ed75
merge
sid-acryl Nov 11, 2024
90eb8fb
timeout for api-call
sid-acryl Nov 11, 2024
449c525
add reporter warning for metadata timeout
sid-acryl Nov 12, 2024
64f60f4
revert threading_timeout code
sid-acryl Nov 12, 2024
2e2cf2d
address review comments
sid-acryl Nov 12, 2024
5d31d51
Merge branch 'master' into ing-755-databricks-multicloud-native-sql
sid-acryl Nov 12, 2024
66de5fa
Merge branch 'ing-755-databricks-multicloud-native-sql' of https://gi…
sid-acryl Nov 12, 2024
dfc180e
Add timeout test-case
sid-acryl Nov 12, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions metadata-ingestion/src/datahub/ingestion/source/powerbi/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,8 @@ class Constant:
APP_SUB_TYPE = "App"
STATE = "state"
ACTIVE = "Active"
SQL_PARSING_FAILURE = "SQL Parsing Failure"
M_QUERY_NULL = '"null"'


@dataclass
Expand Down Expand Up @@ -175,6 +177,11 @@ class SupportedDataPlatform(Enum):
powerbi_data_platform_name="Databricks", datahub_data_platform_name="databricks"
)

DatabricksMultiCloud_SQL = DataPlatformPair(
powerbi_data_platform_name="DatabricksMultiCloud",
datahub_data_platform_name="databricks",
)


@dataclass
class PowerBiDashboardSourceReport(StaleEntityRemovalSourceReport):
Expand All @@ -199,6 +206,8 @@ class PowerBiDashboardSourceReport(StaleEntityRemovalSourceReport):
m_query_parse_unexpected_character_errors: int = 0
m_query_parse_unknown_errors: int = 0
m_query_resolver_errors: int = 0
m_query_resolver_no_lineage: int = 0
m_query_resolver_successes: int = 0

def report_dashboards_scanned(self, count: int = 1) -> None:
self.dashboards_scanned += count
Expand Down Expand Up @@ -495,6 +504,18 @@ class PowerBiDashboardSourceConfig(
description="Whether to ingest workspace app. Requires DataHub server 0.14.2+.",
)

m_query_parse_timeout: int = pydantic.Field(
default=70,
description="Timeout for PowerBI M-query parsing in seconds. Table-level lineage is determined by analyzing the M-query expression. "
"Increase this value if you encounter the 'M-Query Parsing Timeout' message in the connector report.",
)

metadata_api_timeout: int = pydantic.Field(
default=30,
description="timeout in seconds for Metadata Rest Api.",
hidden_from_docs=True,
)

@root_validator(skip_on_failure=True)
def validate_extract_column_level_lineage(cls, values: Dict) -> Dict:
flags = [
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import logging
import re
from typing import List, Optional

import sqlparse
Expand All @@ -9,14 +10,29 @@
create_lineage_sql_parsed_result,
)

SPECIAL_CHARACTERS = ["#(lf)", "(lf)", "#(tab)"]
# It is the PowerBI M-Query way to mentioned \n , \t
SPECIAL_CHARACTERS = {
"#(lf)": "\n",
"(lf)": "\n",
"#(tab)": "\t",
}

ANSI_ESCAPE_CHARACTERS = r"\x1b\[[0-9;]*m"

logger = logging.getLogger(__name__)


def remove_special_characters(native_query: str) -> str:
for char in SPECIAL_CHARACTERS:
native_query = native_query.replace(char, " ")
native_query = native_query.replace(char, SPECIAL_CHARACTERS[char])

ansi_escape_regx = re.compile(ANSI_ESCAPE_CHARACTERS)

native_query = ansi_escape_regx.sub("", native_query)

# Replace "" quotes by ". Sqlglot is not handling column name alias surrounded with two double quotes

native_query = native_query.replace('""', '"')

return native_query

Expand Down Expand Up @@ -53,6 +69,15 @@ def get_tables(native_query: str) -> List[str]:
return tables


def remove_drop_statement(query: str) -> str:
# Certain PowerBI M-Queries contain a combination of DROP and SELECT statements within SQL, causing SQLParser to fail on these queries.
# Therefore, these occurrences are being removed.
# Regular expression to match patterns like "DROP TABLE IF EXISTS #<identifier>;"
pattern = r"DROP TABLE IF EXISTS #\w+;?"

return re.sub(pattern, "", query)


def parse_custom_sql(
ctx: PipelineContext,
query: str,
Expand All @@ -65,12 +90,10 @@ def parse_custom_sql(

logger.debug("Using sqlglot_lineage to parse custom sql")

sql_query = remove_special_characters(query)

logger.debug(f"Processing native query = {sql_query}")
logger.debug(f"Processing native query using DataHub Sql Parser = {query}")

return create_lineage_sql_parsed_result(
query=sql_query,
query=query,
default_schema=schema,
default_db=database,
platform=platform,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,15 +37,19 @@ def get_lark_parser() -> Lark:
return Lark(grammar, start="let_expression", regex=True)


def _parse_expression(expression: str) -> Tree:
def _parse_expression(expression: str, parse_timeout: int = 60) -> Tree:
lark_parser: Lark = get_lark_parser()

# Replace U+00a0 NO-BREAK SPACE with a normal space.
# Sometimes PowerBI returns expressions with this character and it breaks the parser.
expression = expression.replace("\u00a0", " ")

# Parser resolves the variable=null value to variable='', and in the Tree we get empty string
# to distinguish between an empty and null set =null to ="null"
expression = expression.replace("=null", '="null"')

logger.debug(f"Parsing expression = {expression}")
with threading_timeout(_M_QUERY_PARSE_TIMEOUT):
with threading_timeout(parse_timeout):
parse_tree: Tree = lark_parser.parse(expression)

if TRACE_POWERBI_MQUERY_PARSER:
Expand Down Expand Up @@ -74,30 +78,33 @@ def get_upstream_tables(
)

try:
with reporter.m_query_parse_timer:
reporter.m_query_parse_attempts += 1
parse_tree: Tree = _parse_expression(table.expression)

valid, message = validator.validate_parse_tree(
parse_tree, native_query_enabled=config.native_query_parsing
table.expression, native_query_enabled=config.native_query_parsing
)
if valid is False:
assert message is not None
logger.debug(f"Validation failed: {message}")
reporter.info(
title="Unsupported M-Query",
message="DataAccess function is not present in M-Query expression",
title="Non-Data Platform Expression",
message=message,
context=f"table-full-name={table.full_name}, expression={table.expression}, message={message}",
)
reporter.m_query_parse_validation_errors += 1
return []

with reporter.m_query_parse_timer:
reporter.m_query_parse_attempts += 1
parse_tree: Tree = _parse_expression(
table.expression, parse_timeout=config.m_query_parse_timeout
)

except KeyboardInterrupt:
raise
except TimeoutException:
reporter.m_query_parse_timeouts += 1
reporter.warning(
title="M-Query Parsing Timeout",
message=f"M-Query parsing timed out after {_M_QUERY_PARSE_TIMEOUT} seconds. Lineage for this table will not be extracted.",
message=f"M-Query parsing timed out after {config.m_query_parse_timeout} seconds. Lineage for this table will not be extracted.",
context=f"table-full-name={table.full_name}, expression={table.expression}",
)
return []
Expand All @@ -112,7 +119,7 @@ def get_upstream_tables(
reporter.m_query_parse_unknown_errors += 1

reporter.warning(
title="Unable to extract lineage from M-Query expression",
title="Unable to parse M-Query expression",
message=f"Got an '{error_type}' while parsing the expression. Lineage will be missing for this table.",
context=f"table-full-name={table.full_name}, expression={table.expression}",
exc=e,
Expand All @@ -132,6 +139,10 @@ def get_upstream_tables(
platform_instance_resolver=platform_instance_resolver,
)

if lineage:
reporter.m_query_resolver_successes += 1
else:
reporter.m_query_resolver_no_lineage += 1
return lineage

except BaseException as e:
Expand Down
Loading
Loading