Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ingest/snowflake): support lineage via rename and swap using que… #11600

Merged
merged 9 commits into from
Oct 23, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@
PreparsedQuery,
SqlAggregatorReport,
SqlParsingAggregator,
TableRename,
TableSwap,
)
from datahub.sql_parsing.sql_parsing_common import QueryType
from datahub.sql_parsing.sqlglot_lineage import (
Expand Down Expand Up @@ -109,11 +111,14 @@ class SnowflakeQueriesSourceConfig(
@dataclass
class SnowflakeQueriesExtractorReport(Report):
copy_history_fetch_timer: PerfTimer = dataclasses.field(default_factory=PerfTimer)
table_renames_fetch_timer: PerfTimer = dataclasses.field(default_factory=PerfTimer)
query_log_fetch_timer: PerfTimer = dataclasses.field(default_factory=PerfTimer)

audit_log_load_timer: PerfTimer = dataclasses.field(default_factory=PerfTimer)
sql_aggregator: Optional[SqlAggregatorReport] = None

num_ddl_queries_dropped: int = 0


@dataclass
class SnowflakeQueriesSourceReport(SourceReport):
Expand Down Expand Up @@ -218,7 +223,9 @@ def get_workunits_internal(
audit_log_file = self.local_temp_path / "audit_log.sqlite"
use_cached_audit_log = audit_log_file.exists()

queries: FileBackedList[Union[KnownLineageMapping, PreparsedQuery]]
queries: FileBackedList[
Union[KnownLineageMapping, PreparsedQuery, TableRename, TableSwap]
]
if use_cached_audit_log:
logger.info("Using cached audit log")
shared_connection = ConnectionWrapper(audit_log_file)
Expand All @@ -228,7 +235,7 @@ def get_workunits_internal(

shared_connection = ConnectionWrapper(audit_log_file)
queries = FileBackedList(shared_connection)
entry: Union[KnownLineageMapping, PreparsedQuery]
entry: Union[KnownLineageMapping, PreparsedQuery, TableRename, TableSwap]

with self.report.copy_history_fetch_timer:
for entry in self.fetch_copy_history():
Expand Down Expand Up @@ -285,7 +292,7 @@ def fetch_copy_history(self) -> Iterable[KnownLineageMapping]:

def fetch_query_log(
self,
) -> Iterable[PreparsedQuery]:
) -> Iterable[Union[PreparsedQuery, TableRename, TableSwap]]:
query_log_query = _build_enriched_query_log_query(
start_time=self.config.window.start_time,
end_time=self.config.window.end_time,
Expand Down Expand Up @@ -313,12 +320,16 @@ def fetch_query_log(
exc=e,
)
else:
yield entry
if entry:
yield entry

def _parse_audit_log_row(self, row: Dict[str, Any]) -> PreparsedQuery:
def _parse_audit_log_row(
self, row: Dict[str, Any]
) -> Optional[Union[TableRename, TableSwap, PreparsedQuery]]:
json_fields = {
"DIRECT_OBJECTS_ACCESSED",
"OBJECTS_MODIFIED",
"OBJECT_MODIFIED_BY_DDL",
}

res = {}
Expand All @@ -330,6 +341,15 @@ def _parse_audit_log_row(self, row: Dict[str, Any]) -> PreparsedQuery:

direct_objects_accessed = res["direct_objects_accessed"]
objects_modified = res["objects_modified"]
object_modified_by_ddl = res["object_modified_by_ddl"]

if object_modified_by_ddl and not objects_modified:
ddl_entry: Optional[Union[TableRename, TableSwap]] = None
with self.structured_reporter.report_exc(
"Error fetching ddl lineage from Snowflake"
):
ddl_entry = self.parse_ddl_query(object_modified_by_ddl)
return ddl_entry

upstreams = []
column_usage = {}
Expand Down Expand Up @@ -426,6 +446,45 @@ def _parse_audit_log_row(self, row: Dict[str, Any]) -> PreparsedQuery:
)
return entry

def parse_ddl_query(
self, object_modified_by_ddl: dict
) -> Optional[Union[TableRename, TableSwap]]:
if object_modified_by_ddl[
"operationType"
] == "ALTER" and object_modified_by_ddl["properties"].get("swapTargetName"):
urn1 = self.identifiers.gen_dataset_urn(
self.identifiers.get_dataset_identifier_from_qualified_name(
object_modified_by_ddl["objectName"]
)
)

urn2 = self.identifiers.gen_dataset_urn(
self.identifiers.get_dataset_identifier_from_qualified_name(
object_modified_by_ddl["properties"]["swapTargetName"]["value"]
)
)

return TableSwap(urn1, urn2)
elif object_modified_by_ddl[
"operationType"
] == "RENAME_TABLE" and object_modified_by_ddl["properties"].get("objectName"):
original_un = self.identifiers.gen_dataset_urn(
self.identifiers.get_dataset_identifier_from_qualified_name(
object_modified_by_ddl["objectName"]
)
)

new_urn = self.identifiers.gen_dataset_urn(
self.identifiers.get_dataset_identifier_from_qualified_name(
object_modified_by_ddl["properties"]["objectName"]["value"]
)
)

return TableRename(original_un, new_urn)
else:
self.report.num_ddl_queries_dropped += 1
return None


class SnowflakeQueriesSource(Source):
def __init__(self, ctx: PipelineContext, config: SnowflakeQueriesSourceConfig):
Expand Down Expand Up @@ -524,6 +583,7 @@ def _build_enriched_query_log_query(
user_name,
direct_objects_accessed,
objects_modified,
object_modified_by_ddl
FROM
snowflake.account_usage.access_history
WHERE
Expand All @@ -545,8 +605,9 @@ def _build_enriched_query_log_query(
) as direct_objects_accessed,
-- TODO: Drop the columns.baseSources subfield.
FILTER(objects_modified, o -> o:objectDomain IN {SnowflakeQuery.ACCESS_HISTORY_TABLE_VIEW_DOMAINS_FILTER}) as objects_modified,
case when object_modified_by_ddl:objectDomain IN {SnowflakeQuery.ACCESS_HISTORY_TABLE_VIEW_DOMAINS_FILTER} then object_modified_by_ddl else null end as object_modified_by_ddl
FROM raw_access_history
WHERE ( array_size(direct_objects_accessed) > 0 or array_size(objects_modified) > 0 )
WHERE ( array_size(direct_objects_accessed) > 0 or array_size(objects_modified) > 0 or object_modified_by_ddl is not null )
)
, query_access_history AS (
SELECT
Expand All @@ -568,8 +629,12 @@ def _build_enriched_query_log_query(
q.role_name AS "ROLE_NAME",
a.direct_objects_accessed,
a.objects_modified,
a.object_modified_by_ddl
FROM deduplicated_queries q
JOIN filtered_access_history a USING (query_id)
-- This qualify ignores more than 1 rows from same query, such as for ddl swap
QUALIFY
ROW_NUMBER() OVER (PARTITION BY query_id ORDER BY start_time DESC) = 1
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we have the deduplicated_queries cte - should we push stuff down to that?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is for after join with access_history. Table access_history may have more than one row for same query

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As we are now ignoring duplicate swaps, we can in fact remove this qualify as it is fine to process same swap query multiple times.

)
SELECT * FROM query_access_history
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,18 @@ class KnownLineageMapping:
lineage_type: str = models.DatasetLineageTypeClass.COPY


@dataclasses.dataclass
class TableRename:
original_urn: UrnStr
new_urn: UrnStr


@dataclasses.dataclass
class TableSwap:
urn1: UrnStr
urn2: UrnStr


@dataclasses.dataclass
class PreparsedQuery:
# If not provided, we will generate one using the fast fingerprint generator.
Expand Down Expand Up @@ -237,6 +249,7 @@ class SqlAggregatorReport(Report):
num_preparsed_queries: int = 0
num_known_mapping_lineage: int = 0
num_table_renames: int = 0
num_table_swaps: int = 0

# Temp tables.
num_temp_sessions: Optional[int] = None
Expand Down Expand Up @@ -526,7 +539,12 @@ def is_allowed_table(self, urn: UrnStr) -> bool:
def add(
self,
item: Union[
KnownQueryLineageInfo, KnownLineageMapping, PreparsedQuery, ObservedQuery
KnownQueryLineageInfo,
KnownLineageMapping,
PreparsedQuery,
ObservedQuery,
TableRename,
TableSwap,
],
) -> None:
if isinstance(item, KnownQueryLineageInfo):
Expand All @@ -537,6 +555,10 @@ def add(
self.add_preparsed_query(item)
elif isinstance(item, ObservedQuery):
self.add_observed_query(item)
elif isinstance(item, TableRename):
self.add_table_rename(item.original_urn, item.new_urn)
elif isinstance(item, TableSwap):
self.add_table_swap(item.urn1, item.urn2)
else:
raise ValueError(f"Cannot add unknown item type: {type(item)}")

Expand Down Expand Up @@ -892,11 +914,8 @@ def add_table_rename(
) -> None:
"""Add a table rename to the aggregator.

This will so that all _future_ observed queries that reference the original urn
will instead generate usage and lineage for the new urn.

Currently, this does not affect any queries that have already been observed.
TODO: Add a mechanism to update the lineage for queries that have already been observed.
This will make all observed queries that reference the original urn
will instead generate lineage for the new urn.

Args:
original_urn: The original dataset URN.
Expand All @@ -905,9 +924,43 @@ def add_table_rename(

self.report.num_table_renames += 1

if original_urn in self._lineage_map:
self._lineage_map[new_urn] = self._lineage_map.pop(original_urn)

# This will not work if the table is renamed multiple times.
self._table_renames[original_urn] = new_urn

def add_table_swap(
self,
urn1: UrnStr,
urn2: UrnStr,
) -> None:
"""Add a table swap to the aggregator.

Args:
urn1, urn2: The dataset URNs to swap.
"""

self.report.num_table_swaps += 1

urn1_lineage: Optional[OrderedSet[QueryId]] = None
urn2_lineage: Optional[OrderedSet[QueryId]] = None

if urn1 in self._lineage_map:
urn1_lineage = self._lineage_map.pop(urn1)

if urn2 in self._lineage_map:
urn2_lineage = self._lineage_map.pop(urn2)

if urn1_lineage:
self._lineage_map[urn2] = urn1_lineage

if urn2_lineage:
self._lineage_map[urn1] = urn2_lineage

self._table_renames[urn1] = urn2
self._table_renames[urn2] = urn1
mayurinehate marked this conversation as resolved.
Show resolved Hide resolved

def _make_schema_resolver_for_session(
self, session_id: str
) -> SchemaResolverInterface:
Expand Down
Loading
Loading