-
Notifications
You must be signed in to change notification settings - Fork 14.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Optimize deferrable execution mode for DbtCloudRunJobOperator
#31188
Conversation
self.log.info("Job run %s has completed successfully.", str(self.run_id)) | ||
return self.run_id | ||
elif job_run_status in ( | ||
DbtCloudJobRunStatus.CANCELLED.value, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How do we want to handle the cancelled state? If the user has manually cancelled the job and does not want further processing, we should not retry such cancelled jobs. By default, Airflow will retry such tasks. We could raise AirflowFailException
in such cases so that Airflow does not retry those tasks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great question. I think this could be handled in a separate PR though.
I could see hard-failing the task on user cancellation being unexpected or expected. Perhaps this could be a new parameter to control how cancelled runs are handled?
method_name="execute_complete", | ||
) | ||
job_run_info = JobRunInfo(account_id=self.account_id, run_id=self.run_id) | ||
job_run_status = self.hook.get_job_run_status(**job_run_info) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we log the state here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This hook method does have logging lines of
self.log.info("Getting the status of job run %s.", str(run_id))
and
self.log.info(
"Current status of job run %s: %s", str(run_id), DbtCloudJobRunStatus(job_run_status).name
)
which should handle the state logging.
method_name="execute_complete", | ||
) | ||
job_run_info = JobRunInfo(account_id=self.account_id, run_id=self.run_id) | ||
job_run_status = self.hook.get_job_run_status(**job_run_info) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This hook method does have logging lines of
self.log.info("Getting the status of job run %s.", str(run_id))
and
self.log.info(
"Current status of job run %s: %s", str(run_id), DbtCloudJobRunStatus(job_run_status).name
)
which should handle the state logging.
self.log.info("Job run %s has completed successfully.", str(self.run_id)) | ||
return self.run_id | ||
elif job_run_status in ( | ||
DbtCloudJobRunStatus.CANCELLED.value, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great question. I think this could be handled in a separate PR though.
I could see hard-failing the task on user cancellation being unexpected or expected. Perhaps this could be a new parameter to control how cancelled runs are handled?
In deferrable mode for DbtCloudRunJobOperator, we should first check if job is in terminal state or not in the execute method and only defer if that is not in terminal state. This way we don’t run an unnecessary deferral cycle if the condition is already true.
^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named
{pr_number}.significant.rst
or{issue_number}.significant.rst
, in newsfragments.