Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ingestion/powerbi): DatabricksMultiCloud native query support #11756

Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
Show all changes
53 commits
Select commit Hold shift + click to select a range
11c8da1
remove ANSI escape sequence
sid-acryl Oct 30, 2024
96abb10
Merge branch 'master' into ing-755-databricks-multicloud-native-sql
sid-acryl Oct 30, 2024
73aab8f
Remove ANSI escape characters
sid-acryl Oct 30, 2024
08de42a
log message
sid-acryl Oct 30, 2024
a4af0ce
Trigger build
sid-acryl Oct 30, 2024
4e53d5d
Native sql support for DatabricksMultiCloud
sid-acryl Oct 30, 2024
4a5af75
Merge branch 'master' into ing-755-databricks-multicloud-native-sql
sid-acryl Oct 30, 2024
15d55e9
lint fix
sid-acryl Oct 30, 2024
774172c
Merge branch 'ing-755-databricks-multicloud-native-sql' of https://gi…
sid-acryl Oct 30, 2024
b2fd30b
Trigger build
sid-acryl Oct 30, 2024
fcb4bb5
Merge branch 'master' into ing-755-databricks-multicloud-native-sql
hsheth2 Oct 30, 2024
1b3de82
replace "" by "
sid-acryl Oct 30, 2024
16dc5ea
Merge branch 'ing-755-databricks-multicloud-native-sql' of https://gi…
sid-acryl Oct 30, 2024
369ccfc
merge conflixt
sid-acryl Oct 31, 2024
37e36df
multi function call support
sid-acryl Oct 31, 2024
8db54c4
Merge branch 'master' into ing-755-databricks-multicloud-native-sql
hsheth2 Oct 31, 2024
de5d993
fix partition exec
hsheth2 Oct 31, 2024
7bdd3cf
improve reporting
hsheth2 Nov 1, 2024
96897bb
tweak comments in partitioned executor
hsheth2 Nov 1, 2024
c37a361
Merge branch 'master' into ing-755-databricks-multicloud-native-sql
sid-acryl Nov 5, 2024
4a79c67
configurable m-query parse timeout
sid-acryl Nov 5, 2024
0a6e680
Merge branch 'ing-755-databricks-multicloud-native-sql' of https://gi…
sid-acryl Nov 5, 2024
4fbe7a5
fix each expression
sid-acryl Nov 6, 2024
c283e54
Merge branch 'master' into ing-755-databricks-multicloud-native-sql
sid-acryl Nov 6, 2024
94e0e4f
fix special characters
sid-acryl Nov 6, 2024
f40dd5a
Merge branch 'ing-755-databricks-multicloud-native-sql' of https://gi…
sid-acryl Nov 6, 2024
fa95918
improve report
sid-acryl Nov 6, 2024
6d52c2d
remove drop table statement from M-Query sql
sid-acryl Nov 6, 2024
ca6d06f
Add sql parsing error in report
sid-acryl Nov 7, 2024
6b81cbb
Merge branch 'master' into ing-755-databricks-multicloud-native-sql
sid-acryl Nov 7, 2024
f26a06e
fix for <class 'AttributeError'>: 'NoneType' object has no attribute …
sid-acryl Nov 7, 2024
b3be595
Merge branch 'ing-755-databricks-multicloud-native-sql' of https://gi…
sid-acryl Nov 7, 2024
df89502
Merge branch 'master' into ing-755-databricks-multicloud-native-sql
sid-acryl Nov 7, 2024
7e5c5ea
info to warning
sid-acryl Nov 7, 2024
aa8cce6
info to warning
sid-acryl Nov 7, 2024
2ef3456
Merge branch 'ing-755-databricks-multicloud-native-sql' of https://gi…
sid-acryl Nov 7, 2024
e4781fe
Merge branch 'master' into ing-755-databricks-multicloud-native-sql
anshbansal Nov 7, 2024
9927c76
grammar fix for empty string
sid-acryl Nov 7, 2024
5348a6a
fix for timeout and alias syntax in native query
sid-acryl Nov 8, 2024
165d940
Merge branch 'master' into ing-755-databricks-multicloud-native-sql
sid-acryl Nov 8, 2024
920f63c
trying hang fix
sid-acryl Nov 9, 2024
76ffd49
Merge branch 'ing-755-databricks-multicloud-native-sql' of https://gi…
sid-acryl Nov 9, 2024
edf6d7a
switch to thread
sid-acryl Nov 9, 2024
c8e4667
Merge branch 'master' into ing-755-databricks-multicloud-native-sql
sid-acryl Nov 9, 2024
413cb71
fix for lark infinite loop
sid-acryl Nov 11, 2024
af1ed75
merge
sid-acryl Nov 11, 2024
90eb8fb
timeout for api-call
sid-acryl Nov 11, 2024
449c525
add reporter warning for metadata timeout
sid-acryl Nov 12, 2024
64f60f4
revert threading_timeout code
sid-acryl Nov 12, 2024
2e2cf2d
address review comments
sid-acryl Nov 12, 2024
5d31d51
Merge branch 'master' into ing-755-databricks-multicloud-native-sql
sid-acryl Nov 12, 2024
66de5fa
Merge branch 'ing-755-databricks-multicloud-native-sql' of https://gi…
sid-acryl Nov 12, 2024
dfc180e
Add timeout test-case
sid-acryl Nov 12, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion metadata-ingestion/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -484,7 +484,7 @@
"teradata": sql_common
| usage_common
| sqlglot_lib
| {"teradatasqlalchemy>=17.20.0.0"},
| {"teradatasqlalchemy>=17.20.0.0,<=20.0.0.2"},
"trino": sql_common | trino,
"starburst-trino-usage": sql_common | usage_common | trino,
"nifi": {"requests", "packaging", "requests-gssapi"},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,11 @@ class SupportedDataPlatform(Enum):
powerbi_data_platform_name="Databricks", datahub_data_platform_name="databricks"
)

DatabricksMultiCloud_SQL = DataPlatformPair(
powerbi_data_platform_name="DatabricksMultiCloud",
datahub_data_platform_name="databricks",
)


@dataclass
class PowerBiDashboardSourceReport(StaleEntityRemovalSourceReport):
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import logging
import re
from typing import List, Optional

import sqlparse
Expand All @@ -11,13 +12,21 @@

SPECIAL_CHARACTERS = ["#(lf)", "(lf)", "#(tab)"]

ANSI_ESCAPE_CHARACTERS = r"\x1b\[[0-9;]*m"

logger = logging.getLogger(__name__)


def remove_special_characters(native_query: str) -> str:
for char in SPECIAL_CHARACTERS:
native_query = native_query.replace(char, " ")

ansi_escape_regx = re.compile(ANSI_ESCAPE_CHARACTERS)

logger.debug("Removing ANSI escape characters")

native_query = ansi_escape_regx.sub("", native_query)

return native_query


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1028,6 +1028,7 @@ class NativeQueryDataPlatformTableCreator(AbstractDataPlatformTableCreator):
SUPPORTED_NATIVE_QUERY_DATA_PLATFORM: dict = {
SupportedDataPlatform.SNOWFLAKE.value.powerbi_data_platform_name: SupportedDataPlatform.SNOWFLAKE,
SupportedDataPlatform.AMAZON_REDSHIFT.value.powerbi_data_platform_name: SupportedDataPlatform.AMAZON_REDSHIFT,
SupportedDataPlatform.DatabricksMultiCloud_SQL.value.powerbi_data_platform_name: SupportedDataPlatform.DatabricksMultiCloud_SQL,
}
current_data_platform: SupportedDataPlatform = SupportedDataPlatform.SNOWFLAKE

Expand Down Expand Up @@ -1075,6 +1076,27 @@ def create_urn_using_old_parser(self, query: str, server: str) -> Lineage:
column_lineage=[],
)

def get_db_name(self, data_access_tokens: List[str]) -> Optional[str]:
# In-case of DatabricksMultiCloud_SQL the database-name is the catalog name,
# and that name is not available in SQL statement
if (
data_access_tokens[0]
!= SupportedDataPlatform.DatabricksMultiCloud_SQL.value.powerbi_data_platform_name
):
return None

if (
len(data_access_tokens) >= 13
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

where are these numbers coming from?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated the code

): # Explicit catalog name is set in Database argument
return tree_function.strip_char(data_access_tokens[9])

if (
len(data_access_tokens) >= 6 and data_access_tokens[4] == "Catalog"
): # use Catalog name is database
return tree_function.strip_char(data_access_tokens[5])

return None
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

overall this method seems extremely brittle

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated the code


def create_lineage(
self, data_access_func_detail: DataAccessFunctionDetail
) -> Lineage:
Expand All @@ -1089,6 +1111,7 @@ def create_lineage(
)
logger.debug(f"Flat argument list = {flat_argument_list}")
return Lineage.empty()

data_access_tokens: List[str] = tree_function.remove_whitespaces_from_list(
tree_function.token_values(flat_argument_list[0])
)
Expand All @@ -1101,6 +1124,8 @@ def create_lineage(
f"NativeQuery is supported only for {self.SUPPORTED_NATIVE_QUERY_DATA_PLATFORM}"
)

return Lineage.empty()

if len(data_access_tokens[0]) < 3:
logger.debug(
f"Server is not available in argument list for data-platform {data_access_tokens[0]}. Returning empty "
Expand All @@ -1111,8 +1136,7 @@ def create_lineage(
self.current_data_platform = self.SUPPORTED_NATIVE_QUERY_DATA_PLATFORM[
data_access_tokens[0]
]

# First argument is the query
# The First argument is the query
sql_query: str = tree_function.strip_char_from_list(
values=tree_function.remove_whitespaces_from_list(
tree_function.token_values(flat_argument_list[1])
Expand All @@ -1130,10 +1154,12 @@ def create_lineage(
server=server,
)

database_name: Optional[str] = self.get_db_name(data_access_tokens)

return self.parse_custom_sql(
query=sql_query,
server=server,
database=None, # database and schema is available inside custom sql as per PowerBI Behavior
database=database_name,
schema=None,
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@ def get_variable_statement(parse_tree: Tree, variable: str) -> Optional[Tree]:

def get_first_rule(tree: Tree, rule: str) -> Optional[Tree]:
"""
Lark library doesn't have advance search function.
This function will return the first tree of provided rule
Lark library doesn't have an advance search function.
This function will return the first tree of the provided rule
:param tree: Tree to search for the expression rule
:return: Tree
"""
Expand Down Expand Up @@ -99,7 +99,6 @@ def internal(node: Union[Tree, Token]) -> None:
logger.debug(f"Unable to resolve parameter reference to {ref}")
values.append(ref)
elif isinstance(node, Token):
# This means we're probably looking at a literal.
values.append(cast(Token, node).value)
return
else:
Expand All @@ -120,10 +119,14 @@ def remove_whitespaces_from_list(values: List[str]) -> List[str]:
return result


def strip_char(value: str, char: str = '"') -> str:
return value.strip(char)


def strip_char_from_list(values: List[str], char: str = '"') -> List[str]:
result: List[str] = []
for item in values:
result.append(item.strip(char))
result.append(strip_char(item.strip(char), char=char))

return result

Expand Down
34 changes: 33 additions & 1 deletion metadata-ingestion/tests/integration/powerbi/test_m_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@
'let\n Source = DatabricksMultiCloud.Catalogs("abc.cloud.databricks.com", "/sql/gh2cfe3fe1d4c7cd", [Catalog="data_analysis", Database="summary", EnableAutomaticProxyDiscovery=null]),\n vips_data_summary_dev = Source{[Item="vips_data",Schema="summary",Catalog="data_analysis"]}[Data],\n #"Changed Type" = Table.TransformColumnTypes(vips_data_summary_dev,{{"vipstartDate", type date}, {"enteredDate", type datetime}, {"estDraftDate", type datetime}, {"estPublDate", type datetime}})\nin\n #"Changed Type"',
'let\n Source = Value.NativeQuery(Snowflake.Databases("0DD93C6BD5A6.snowflakecomputing.com","sales_analytics_warehouse_prod",[Role="sales_analytics_member_ad"]){[Name="ORDERING"]}[Data], "SELECT#(lf) DISTINCT#(lf) T5.PRESENTMENT_START_DATE#(lf),T5.PRESENTMENT_END_DATE#(lf),T5.DISPLAY_NAME#(lf),T5.NAME#(tab)#(lf),T5.PROMO_DISPLAY_NAME#(lf),T5.REGION#(lf),T5.ID#(lf),T5.WALKOUT#(lf),T6.DEAL_ID#(lf),T6.TYPE#(lf),T5.FREE_PERIOD#(lf),T6.PRICE_MODIFICATION#(lf)#(lf)FROM#(lf)#(lf)(#(lf) SELECT #(lf) T1.NAME#(lf),DATE(T1.CREATED_AT) as CREATED_AT#(lf),T1.PROMO_CODE#(lf),T1.STATUS#(lf),DATE(T1.UPDATED_AT) as UPDATED_AT#(lf),T1.ID#(lf),T1.DISPLAY_NAME as PROMO_DISPLAY_NAME#(lf),T4.*#(lf)FROM#(lf)(SELECT#(lf) DISTINCT#(lf) NAME#(lf),CREATED_AT#(lf),PROMO_CODE#(lf),STATUS#(lf),UPDATED_AT#(lf),ID#(lf),DISPLAY_NAME#(lf) FROM RAW.PROMOTIONS#(lf)#(lf)) T1#(lf)INNER JOIN#(lf)#(lf) (#(lf) SELECT #(lf) T3.PRODUCT_STATUS#(lf),T3.CODE#(lf),T3.REGION#(lf),T3.DISPLAY_ORDER_SEQUENCE#(lf),T3.PRODUCT_LINE_ID#(lf),T3.DISPLAY_NAME#(lf),T3.PRODUCT_TYPE#(lf),T3.ID as PROD_TBL_ID#(lf),T3.NAME as PROD_TBL_NAME#(lf),DATE(T2.PRESENTMENT_END_DATE) as PRESENTMENT_END_DATE#(lf),T2.PRICE_COMMITMENT_PERIOD#(lf),T2.NAME as SEAL_TBL_NAME#(lf),DATE(T2.CREATED_AT) as SEAL_TBL_CREATED_AT#(lf),T2.DESCRIPTION#(lf),T2.FREE_PERIOD#(lf),T2.WALKOUT#(lf),T2.PRODUCT_CAT_ID#(lf),T2.PROMOTION_ID#(lf),DATE(T2.PRESENTMENT_START_DATE) as PRESENTMENT_START_DATE#(lf),YEAR(T2.PRESENTMENT_START_DATE) as DEAL_YEAR_START#(lf),MONTH(T2.PRESENTMENT_START_DATE) as DEAL_MONTH_START#(lf),T2.DEAL_TYPE#(lf),DATE(T2.UPDATED_AT) as SEAL_TBL_UPDATED_AT#(lf),T2.ID as SEAL_TBL_ID#(lf),T2.STATUS as SEAL_TBL_STATUS#(lf)FROM#(lf)(SELECT#(lf) DISTINCT#(lf) PRODUCT_STATUS#(lf),CODE#(lf),REGION#(lf),DISPLAY_ORDER_SEQUENCE#(lf),PRODUCT_LINE_ID#(lf),DISPLAY_NAME#(lf),PRODUCT_TYPE#(lf),ID #(lf),NAME #(lf) FROM#(lf) RAW.PRODUCTS#(lf)#(lf)) T3#(lf)INNER JOIN#(lf)(#(lf) SELECT#(lf) DISTINCT#(lf) PRESENTMENT_END_DATE#(lf),PRICE_COMMITMENT_PERIOD#(lf),NAME#(lf),CREATED_AT#(lf),DESCRIPTION#(lf),FREE_PERIOD#(lf),WALKOUT#(lf),PRODUCT_CAT_ID#(lf),PROMOTION_ID#(lf),PRESENTMENT_START_DATE#(lf),DEAL_TYPE#(lf),UPDATED_AT#(lf),ID#(lf),STATUS#(lf) FROM#(lf) RAW.DEALS#(lf)#(lf)) T2#(lf)ON#(lf)T3.ID = T2.PRODUCT_CAT_ID #(lf)WHERE#(lf)T2.PRESENTMENT_START_DATE >= \'2015-01-01\'#(lf)AND#(lf)T2.STATUS = \'active\'#(lf)#(lf))T4#(lf)ON#(lf)T1.ID = T4.PROMOTION_ID#(lf))T5#(lf)INNER JOIN#(lf)RAW.PRICE_MODIFICATIONS T6#(lf)ON#(lf)T5.SEAL_TBL_ID = T6.DEAL_ID", null, [EnableFolding=true]) \n in \n Source',
'let\n Source = Databricks.Catalogs(#"hostname",#"http_path", null),\n edp_prod_Database = Source{[Name=#"catalog",Kind="Database"]}[Data],\n gold_Schema = edp_prod_Database{[Name=#"schema",Kind="Schema"]}[Data],\n pet_view = gold_Schema{[Name="pet_list",Kind="View"]}[Data],\n #"Filtered Rows" = Table.SelectRows(pet_view, each true),\n #"Removed Columns" = Table.RemoveColumns(#"Filtered Rows",{"created_timestmp"})\nin\n #"Removed Columns"',
'let\n Source = Value.NativeQuery(Snowflake.Databases("0DD93C6BD5A6.snowflakecomputing.com","sales_analytics_warehouse_prod",[Role="sales_analytics_member_ad"]){[Name="SL_OPERATIONS"]}[Data], "select SALE_NO AS ""SaleNo""#(lf) ,CODE AS ""Code""#(lf) ,ENDDATE AS ""end_date""#(lf) from SL_OPERATIONS.SALE.REPORTS#(lf) where ENDDATE > \'2024-02-03\'", null, [EnableFolding=true]),\n #"selected Row" = Table.SelectRows(Source)\nin\n #"selected Row"',
'let\n Source = Value.NativeQuery(Snowflake.Databases("0DD93C6BD5A6.snowflakecomputing.com","sales_analytics_warehouse_prod",[Role="sales_analytics_member_ad"]){[Name="SL_OPERATIONS"]}[Data], "select SALE_NO AS ""\x1b[4mSaleNo\x1b[0m""#(lf) ,CODE AS ""Code""#(lf) ,ENDDATE AS ""end_date""#(lf) from SL_OPERATIONS.SALE.REPORTS#(lf) where ENDDATE > \'2024-02-03\'", null, [EnableFolding=true]),\n #"selected Row" = Table.SelectRows(Source)\nin\n #"selected Row"',
'let\n Source = Value.NativeQuery(DatabricksMultiCloud.Catalogs("foo.com", "/sql/1.0/warehouses/423423ew", [Catalog="sales_db", Database=null, EnableAutomaticProxyDiscovery=null]){[Name="sales_db",Kind="Database"]}[Data], "select * from public.slae_history#(lf)where creation_timestamp >= getDate(-3)", null, [EnableFolding=true]),\n #"NewTable" = Table.TransformColumn(Source,{{"creation_timestamp", type date}})\nin\n #"NewTable"',
]


Expand Down Expand Up @@ -961,3 +962,34 @@ def test_snowflake_double_double_quotes():
data_platform_tables[0].urn
== "urn:li:dataset:(urn:li:dataPlatform:snowflake,sl_operations.sale.reports,PROD)"
)


def test_databricks_multicloud():
q = M_QUERIES[31]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we added two statements above, but only one is tested?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

deleted the duplicate one

table: powerbi_data_classes.Table = powerbi_data_classes.Table(
columns=[],
measures=[],
expression=q,
name="virtual_order_table",
full_name="OrderDataSet.virtual_order_table",
)

reporter = PowerBiDashboardSourceReport()

ctx, config, platform_instance_resolver = get_default_instances()

config.enable_advance_lineage_sql_construct = True

data_platform_tables: List[DataPlatformTable] = parser.get_upstream_tables(
table,
reporter,
ctx=ctx,
config=config,
platform_instance_resolver=platform_instance_resolver,
)[0].upstreams

assert len(data_platform_tables) == 1
assert (
data_platform_tables[0].urn
== "urn:li:dataset:(urn:li:dataPlatform:databricks,sales_db.public.slae_history,PROD)"
)
Loading