Skip to content

Commit

Permalink
fix some bugs
Browse files Browse the repository at this point in the history
  • Loading branch information
hsheth2 committed Jul 17, 2024
1 parent abfc574 commit dcb786f
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -368,8 +368,9 @@ def _parse_audit_log_row(self, row: Dict[str, Any]) -> PreparsedQuery:
)
)

# TODO implement email address mapping
user = CorpUserUrn(res["user_name"])
# TODO: Fetch email addresses from Snowflake to map user -> email
# TODO: Support email_domain fallback for generating user urns.
user = CorpUserUrn(self.identifiers.snowflake_identifier(res["user_name"]))

timestamp: datetime = res["query_start_time"]
timestamp = timestamp.astimezone(timezone.utc)
Expand All @@ -380,14 +381,18 @@ def _parse_audit_log_row(self, row: Dict[str, Any]) -> PreparsedQuery:
)

entry = PreparsedQuery(
query_id=res["query_fingerprint"],
# Despite having Snowflake's fingerprints available, our own fingerprinting logic does a better
# job at eliminating redundant / repetitive queries. As such, we don't include the fingerprint
# here so that the aggregator auto-generates one.
# query_id=res["query_fingerprint"],
query_id=None,
query_text=res["query_text"],
upstreams=upstreams,
downstream=downstream,
column_lineage=column_lineage,
column_usage=column_usage,
inferred_schema=None,
confidence_score=1,
confidence_score=1.0,
query_count=res["query_count"],
user=user,
timestamp=timestamp,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ def __init__(self, ctx: PipelineContext, config: SnowflakeV2Config):
self.lineage_extractor: Optional[SnowflakeLineageExtractor] = None
self.aggregator: Optional[SqlParsingAggregator] = None

if self.config.include_table_lineage:
if self.config.use_queries_v2 or self.config.include_table_lineage:
self.aggregator = SqlParsingAggregator(
platform=self.identifiers.platform,
platform_instance=self.config.platform_instance,
Expand All @@ -179,6 +179,8 @@ def __init__(self, ctx: PipelineContext, config: SnowflakeV2Config):
)
self.report.sql_aggregator = self.aggregator.report

if self.config.include_table_lineage:
assert self.aggregator is not None
redundant_lineage_run_skip_handler: Optional[
RedundantLineageRunSkipHandler
] = None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -276,8 +276,12 @@ def __init__(
self.generate_usage_statistics = generate_usage_statistics
self.generate_query_usage_statistics = generate_query_usage_statistics
self.generate_operations = generate_operations
if self.generate_queries and not self.generate_lineage:
raise ValueError("Queries will only be generated if lineage is enabled")
if self.generate_queries and not (
self.generate_lineage or self.generate_query_usage_statistics
):
logger.warning(
"Queries will not be generated, as neither lineage nor query usage statistics are enabled"
)

self.usage_config = usage_config
if (
Expand Down

0 comments on commit dcb786f

Please sign in to comment.