-
Notifications
You must be signed in to change notification settings - Fork 16.3k
Fix: SnowflakeSqlApiOperator(without deferable flag) incorrectly marks the task as successful even when jobs are still active #46672
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Congratulations on your first Pull Request and welcome to the Apache Airflow community! If you have any issues or are unsure about any anything please check our Contributors' Guide (https://github.com/apache/airflow/blob/main/contributing-docs/README.rst)
|
providers/snowflake/tests/provider_tests/snowflake/operators/test_snowflake.py
Outdated
Show resolved
Hide resolved
providers/snowflake/tests/provider_tests/snowflake/operators/test_snowflake.py
Outdated
Show resolved
Hide resolved
|
You need to resolve conflicts now |
|
Hi @potiuk! |
|
Hi @rawwar ! |
I think you should be able to mark the conversations as resolved. If not, a committer will be able to help. I don't have permission to mark the conversation as resolved. |
|
Hi @potiuk! |
@subbota19 , take changes from main. test failures are already fixed. |
… Before Snowflake Job Completes (apache#46648): implementation & tests
Hi @rawwar, thx, I've rebased to the main branch |
providers/snowflake/src/airflow/providers/snowflake/operators/snowflake.py
Show resolved
Hide resolved
|
Awesome work, congrats on your first merged pull request! You are invited to check our Issue Tracker for additional contributions. |
… Before Snowflake Job Completes (apache#46648): implementation & tests (apache#46672)
Fix #46648.
Problem Description
In deferable mode, Airflow uses triggers to check the job status, so every time the deferable interval is reached, it checks the status and yields an event on this line.
However, when not using deferable logic, the execution follows the else statement, which does not include a while loop to continuously monitor execution. Instead, on this line, the task iterates through active Snowflake jobs, checks their status and then sleeps for poll_interval.
This results in the following behavior:
✅ If a job finishes in less than 5 seconds ( time.sleep using the default poll_interval), the logic works correctly.
❌ If a job takes longer than 5 seconds, there is no mechanism to keep monitoring it, and the task incorrectly finishes after the loop ends - even if the Snowflake job is still running.
Suggested Fix
I propose adding a running status check inside poll_on_queries and modifying the else statement with a loop to ensure Airflow waits until all queries finish, the logic is quick similar to what we use in deferable trigger while loop
With this change, the Airflow task would periodically call poll_on_queries and continue waiting until all jobs are complete.
Tests
Simulates a scenario where the Snowflake job is running for two checks (10 seconds apart, assuming default poll_interval): ffter the second check, the job is completed successfully and task is marked as successful.
Simulates a scenario where the Snowflake job failed and ensures AirflowException is raised.