-
Notifications
You must be signed in to change notification settings - Fork 14.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
EMR Serverless Fix for Jobs marked as success even on failure #26218
EMR Serverless Fix for Jobs marked as success even on failure #26218
Conversation
@@ -650,7 +650,7 @@ def execute(self, context: 'Context') -> Dict: | |||
'jobRunId': response['jobRunId'], | |||
}, | |||
parse_response=['jobRun', 'state'], | |||
desired_state=EmrServerlessJobSensor.TERMINAL_STATES, | |||
desired_state=EmrServerlessJobSensor.SUCCESS_STATES, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we have test coverage for this?
also Operator imports from Sensor ? 🤯
The statuses should be defined on the Hook
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Examples:
airflow/airflow/providers/amazon/aws/hooks/quicksight.py
Lines 39 to 40 in 9c59831
NON_TERMINAL_STATES = {"INITIALIZED", "QUEUED", "RUNNING"} | |
FAILED_STATES = {"FAILED"} |
airflow/airflow/providers/amazon/aws/hooks/emr.py
Lines 178 to 194 in 6b7a343
INTERMEDIATE_STATES = ( | |
"PENDING", | |
"SUBMITTED", | |
"RUNNING", | |
) | |
FAILURE_STATES = ( | |
"FAILED", | |
"CANCELLED", | |
"CANCEL_PENDING", | |
) | |
SUCCESS_STATES = ("COMPLETED",) | |
TERMINAL_STATES = ( | |
"COMPLETED", | |
"FAILED", | |
"CANCELLED", | |
"CANCEL_PENDING", | |
) |
You need to rebase and solve conflicts @syedahsn |
…ES rather than TERMINAL_STATES. This makes it so that the task is marked as a failure if the job doesn't run successfully.
Add test to cover job failure exception.
58bd33b
to
8921c49
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
An issue was found where Airflow would mark a task as SUCCESS even if the EMR job failed.
The solution here is to set the desired state in EmrServerlessStartJobOperator to SUCCESS_STATES rather than TERMINAL_STATES. This makes it so that the task is marked as a failure if the job doesn't run successfully.
^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named
{pr_number}.significant.rst
or{issue_number}.significant.rst
, in newsfragments.