diff --git a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_query.py b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_query.py index bbee4587e2b4d..9f655b34177fc 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_query.py +++ b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_query.py @@ -685,12 +685,7 @@ def table_upstreams_with_column_lineage( t.query_start_time AS query_start_time, t.query_id AS query_id FROM - ( - SELECT * from snowflake.account_usage.access_history - WHERE - query_start_time >= to_timestamp_ltz({start_time_millis}, 3) - AND query_start_time < to_timestamp_ltz({end_time_millis}, 3) - ) t, + snowflake.account_usage.access_history t, lateral flatten(input => t.DIRECT_OBJECTS_ACCESSED) r, lateral flatten(input => t.OBJECTS_MODIFIED) w, lateral flatten(input => w.value : "columns", outer => true) wcols, @@ -780,12 +775,14 @@ def table_upstreams_with_column_lineage( queries AS ( select qid.downstream_table_name, qid.query_id, query_history.query_text, query_history.start_time from query_ids qid - JOIN ( + LEFT JOIN ( SELECT * FROM snowflake.account_usage.query_history WHERE query_history.start_time >= to_timestamp_ltz({start_time_millis}, 3) AND query_history.start_time < to_timestamp_ltz({end_time_millis}, 3) ) query_history on qid.query_id = query_history.query_id + WHERE qid.query_id is not null + AND query_history.query_text is not null ) SELECT h.downstream_table_name AS "DOWNSTREAM_TABLE_NAME", @@ -850,12 +847,7 @@ def table_upstreams_only( t.query_start_time AS query_start_time, t.query_id AS query_id FROM - ( - SELECT * from snowflake.account_usage.access_history - WHERE - query_start_time >= to_timestamp_ltz({start_time_millis}, 3) - AND query_start_time < to_timestamp_ltz({end_time_millis}, 3) - ) t, + snowflake.account_usage.access_history t, lateral flatten(input => t.DIRECT_OBJECTS_ACCESSED) r, lateral flatten(input => t.OBJECTS_MODIFIED) w WHERE