Skip to content

SnowflakeSqlApiOperator(without deferable flag) marks Task as Success Before Snowflake Job Completes #46648

@subbota19

Description

@subbota19

Apache Airflow version

Other Airflow 2 version (please specify below)

If "Other Airflow 2 version" selected, which one?

2.10.3

What happened?

The main issue occurs when executing single/multiple SQL statements using SnowflakeSqlApiOperator without the deferable flag. After the first poll check from Snowflake (with poll_interval set to 5 seconds by default), the triggered Snowflake job continues running. However, Airflow marks the task as successful, even though the actual Snowflake job is still in progress.

What you think should happen instead?

Instead of marking the task as successful, Airflow task should wait until the Snowflake job is fully completed before marking the task as successful or failed accordingly.

How to reproduce

Use the following Airflow task:

    test= SnowflakeSqlApiOperator(
        task_id="test",
        sql="SELECT SYSTEM$WAIT(10);",
        warehouse="WAREHOUSE_ID",
        database="DATABASE_ID",
    )

At the end of the query execution, the following log message appears:

[2025-02-11, 14:54:59 UTC] {snowflake_sql_api.py:283} INFO - {'code': '333334', 'message': 'Asynchronous execution in progress. Use provided query id to perform query monitoring and management.', 'statementHandle': '01ba52be-0103-fdba-0000-8c0d3c1e25ca', 'statementStatusUrl': '/api/v2/statements/01ba52be-0103-fdba-0000-8c0d3c1e25ca'}
[2025-02-11, 14:54:59 UTC] {taskinstance.py:352} INFO - Marking task as SUCCESS.

However, task should wait for this job to finish before marking the task as successful or failed.

Operating System

Linux 5.15.153.1-microsoft-standard-WSL2 x86_64 GNU/Linux

Versions of Apache Airflow Providers

apache-airflow-providers-snowflake==6.0.0

Deployment

Astronomer

Deployment details

Astro Runtime 12.4.0 (Based on Airflow 2.10.3)

Anything else?

I want to highlight that in deferable mode, Airflow uses triggers to check the job status, so every time the deferable interval is reached, it checks the status and yields an event on this line.

However, when not using deferable logic, the execution follows the else statement, which does not include a while loop to monitor execution continuously. Instead, on this line, the task iterates through active Snowflake jobs, checks their status and then sleeps.

This means:

  • If a job finishes in less than 5 seconds, the logic works correctly.

  • If a job takes longer than 5 seconds, there is no mechanism to keep monitoring it, and the task will incorrectly finish after the for-loop ends - even if the Snowflake job is still running.

Suggested Fix
I propose adding a running status check inside poll_on_queries and modifying the else statement with a loop to ensure Airflow waits until all queries finish:

        else:
            while True:
                statement_status = self.poll_on_queries()
                if statement_status["error"]:
                    raise AirflowException(statement_status["error"])
                if not statement_status["running"]:
                    break
            self._hook.check_query_output(self.query_ids)

With this change, Airflow would periodically call poll_on_queries() and continue waiting until all jobs complete.

As part of my testing, I ran the overridden operator with my changes, and this is the log and expected result at the end:

[2025-02-11, 14:55:45 UTC] {snowflake_sql_api.py:289} INFO - Snowflake SQL GET statements status API response: {'code': '333334', 'message': 'Asynchronous execution in progress. Use provided query id to perform query monitoring and management.', 'statementHandle': '01ba52bf-0103-fe23-0000-8c0d3c1dcb3e', 'statementStatusUrl': '/api/v2/statements/01ba52bf-0103-fe23-0000-8c0d3c1dcb3e'}
[2025-02-11, 14:55:50 UTC] {snowflake.py:80} INFO - checking : 01ba52bf-0103-fe23-0000-8c0d3c1dcb3e
[2025-02-11, 14:55:50 UTC] {snowflake_sql_api.py:315} INFO - Retrieving status for query id 01ba52bf-0103-fe23-0000-8c0d3c1dcb3e
[2025-02-11, 14:55:50 UTC] {snowflake_sql_api.py:289} INFO - Snowflake SQL GET statements status API response: {'resultSetMetaData': {'numRows': 1, 'format': 'jsonv2', 'partitionInfo': [{'rowCount': 1, 'uncompressedSize': 57}], 'rowType': [{'name': 'multiple statement execution', 'database': '', 'schema': '', 'table': '', 'nullable': False, 'length': 16777216, 'type': 'text', 'scale': None, 'precision': None, 'byteLength': 16777216, 'collation': None}]}, 'data': [['Multiple statements executed successfully.']], 'code': '090001', 'statementHandles': ['01ba52bf-0103-fe23-0000-8c0d3c1dcb42', '01ba52bf-0103-fe23-0000-8c0d3c1dcb4a'], 'statementStatusUrl': '/api/v2/statements/01ba52bf-0103-fe23-0000-8c0d3c1dcb3e?requestId=a85631a6-fa5d-4a0e-b4ca-e9800ac36191', 'requestId': 'a85631a6-fa5d-4a0e-b4ca-e9800ac36191', 'sqlState': '00000', 'statementHandle': '01ba52bf-0103-fe23-0000-8c0d3c1dcb3e', 'message': 'Statement executed successfully.', 'createdOn': 1739285703540}
[2025-02-11, 14:55:55 UTC] {snowflake_sql_api.py:283} INFO - {'resultSetMetaData': {'numRows': 1, 'format': 'jsonv2', 'partitionInfo': [{'rowCount': 1, 'uncompressedSize': 57}], 'rowType': [{'name': 'multiple statement execution', 'database': '', 'schema': '', 'table': '', 'nullable': False, 'length': 16777216, 'type': 'text', 'scale': None, 'precision': None, 'byteLength': 16777216, 'collation': None}]}, 'data': [['Multiple statements executed successfully.']], 'code': '090001', 'statementHandles': ['01ba52bf-0103-fe23-0000-8c0d3c1dcb42', '01ba52bf-0103-fe23-0000-8c0d3c1dcb4a'], 'statementStatusUrl': '/api/v2/statements/01ba52bf-0103-fe23-0000-8c0d3c1dcb3e?requestId=a85631a6-fa5d-4a0e-b4ca-e9800ac36191', 'requestId': 'a85631a6-fa5d-4a0e-b4ca-e9800ac36191', 'sqlState': '00000', 'statementHandle': '01ba52bf-0103-fe23-0000-8c0d3c1dcb3e', 'message': 'Statement executed successfully.', 'createdOn': 1739285703540}
[2025-02-11, 14:55:55 UTC] {taskinstance.py:340} ▼ Post task execution logs
[2025-02-11, 14:55:55 UTC] {taskinstance.py:352} INFO - Marking task as SUCCESS.

At the end I want to say that of course for long-running jobs, we should use the deferable flag, but if the non-deferable option remains available, its logic should still function correctly

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Metadata

Metadata

Assignees

Labels

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions