Skip to content

Comments

add wait_policy in EmrCreateJobFlorOperator#61195

Merged
vincbeck merged 1 commit intoapache:mainfrom
henry3260:add-wait-policy
Feb 3, 2026
Merged

add wait_policy in EmrCreateJobFlorOperator#61195
vincbeck merged 1 commit intoapache:mainfrom
henry3260:add-wait-policy

Conversation

@henry3260
Copy link
Contributor

@henry3260 henry3260 commented Jan 29, 2026

What

This PR fixes a bug in EmrCreateJobFlowOperator where the wait_policy parameter was ignored, causing the operator to always default to the JobFlowWaiting waiter (waiting for the cluster to start) regardless of the user's input.

The changes include:

Persisting Parameters: Ensuring wait_policy is correctly persisted in the operator instance rather than being converted to a boolean and lost.

Smart Initialization Logic: Updating init to allow users to provide both wait_for_completion and wait_policy without conflict.

If wait_policy is provided, wait_for_completion is implied to be True (unless explicitly set to False).

If only wait_for_completion=True is provided, wait_policy defaults to WAIT_FOR_COMPLETION for backward compatibility.

Execution Update: Updating the execute method to select the correct boto3 waiter based on the stored wait_policy.

Deferrable Support: Passing the specific waiter name to EmrCreateJobFlowTrigger to support this logic in deferrable mode.

Why

Currently, if a user wants the operator to wait until the EMR cluster finishes all steps and terminates (using WaitPolicy.WAIT_FOR_STEPS_COMPLETION), the operator fails to do so.

It converts the policy to a boolean wait_for_completion = True in init and discards the specific policy type. Consequently, the execute method hardcodes the waiter to WAITER_POLICY_NAME_MAPPING[WaitPolicy.WAIT_FOR_COMPLETION].

This behavior causes the task to be marked as "Success" as soon as the cluster enters the WAITING state. If the cluster subsequently fails during a step execution, Airflow does not catch the failure, leading to false positives in DAG runs.

closes: #61180

  • Yes (please specify the tool below)

  • Read the Pull Request Guidelines for more information. Note: commit author/co-author name and email in commits become permanently public when merged.
  • For fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
  • When adding dependency, check compliance with the ASF 3rd Party License Policy.
  • For significant user-facing changes create newsfragment: {pr_number}.significant.rst or {issue_number}.significant.rst, in airflow-core/newsfragments.

@boring-cyborg boring-cyborg bot added area:providers provider:amazon AWS/Amazon - related issues labels Jan 29, 2026
@henry3260 henry3260 force-pushed the add-wait-policy branch 2 times, most recently from 6a6d427 to 01db99b Compare January 29, 2026 09:28
@henry3260 henry3260 marked this pull request as ready for review January 29, 2026 10:14
@henry3260 henry3260 requested a review from o-nikolas as a code owner January 29, 2026 10:14
@karenbraganz
Copy link
Collaborator

karenbraganz commented Jan 29, 2026

I think the logic might be less confusing for users if you allow them to set both wait_for_completion and wait_policy without raising a ValueError. If wait_for_completion is set to True, the code would check wait_policy. If wait_policy is not set, it defaults to WAIT_FOR_COMPLETE. This shouldn't interfere with your logic too much because you are anyway setting wait_for_completion to True on line 711.

If you want to use the current logic, I think you should clarify a few things in the param descriptions:

  1. For wait_for_completion set to True, make it clear that the task will succeed once the cluster starts running and does not wait until termination. Use wait_policy to change this behavior.
  2. For wait_policy state that this should not be used unless wait_for_termination is set to False.

@jroachgolf84
Copy link
Collaborator

Are we "un-deprecating" wait_policy? Is there a way to do this without "un-deprecating" it?

@karenbraganz
Copy link
Collaborator

@jroachgolf84 do you know why it was deprecated? Imo we should allow users to configure wait_policy because not all of them may want to wait only till the cluster starts running. It gives them more flexibility to set the success criterion.

@o-nikolas
Copy link
Contributor

@jroachgolf84 do you know why it was deprecated? Imo we should allow users to configure wait_policy because not all of them may want to wait only till the cluster starts running. It gives them more flexibility to set the success criterion.

It looks to me like there was a bit of confusion maybe before? There was an issue created here that requested that waiter_policy be removed because:

The operator EmrCreateJobFlowOperator should follow the same standards as the other operator. All operators in Amazon provider package use the parameter wait_for_completion to configure whether the user wants to wait. However, in EmrCreateJobFlowOperator, this parameter is named wait_policy and is not a boolean but an enum.

It looks to me like the EmrCreateJobFlowOperator used waiter_policy in addition to wait_for_completion. I quite like the pattern outlined in the Description here where wait_for_completion decides whether to wait and waiter_policy decides how to wait (but still inferring the former from the latter so that both don't need specifying).

Tagging @vincbeck who created the original issue to remove waiter_policy

@o-nikolas o-nikolas requested a review from vincbeck February 1, 2026 01:31
@vincbeck
Copy link
Contributor

vincbeck commented Feb 2, 2026

Let's try to untangle things here :) A per my understanding, before the operator had only wait_policy to configure whether you want to wait or not. That's why I created #56153, for consistency purposes, all across AWS operator we use wait_for_completion to configure whether you want the operator to wait. Hence, we should keep it.

Now, it seems you also want to have different options of waiting, which is fine. I do not think it is a problem to un-deprecate a parameter either. Though, we need to be careful and be backward compatible and handle all the different possibilities:

  • If wait_for_completion is True:
    • If wait_policy is not defined (or None), default should be WaitPolicy.WAIT_FOR_COMPLETION. As a user, I should not have to set 2 parameters (wait_for_completion and wait_policy) if I want to wait
  • If wait_for_completion is False:
    • If wait_policy is set, then set wait_for_completion to True and raise a deprecation warning saying that wait_for_completion must be set to True in order to wait

What do you think?

@henry3260
Copy link
Contributor Author

Let's try to untangle things here :) A per my understanding, before the operator had only wait_policy to configure whether you want to wait or not. That's why I created #56153, for consistency purposes, all across AWS operator we use wait_for_completion to configure whether you want the operator to wait. Hence, we should keep it.

Now, it seems you also want to have different options of waiting, which is fine. I do not think it is a problem to un-deprecate a parameter either. Though, we need to be careful and be backward compatible and handle all the different possibilities:

  • If wait_for_completion is True:

    • If wait_policy is not defined (or None), default should be WaitPolicy.WAIT_FOR_COMPLETION. As a user, I should not have to set 2 parameters (wait_for_completion and wait_policy) if I want to wait
  • If wait_for_completion is False:

    • If wait_policy is set, then set wait_for_completion to True and raise a deprecation warning saying that wait_for_completion must be set to True in order to wait

What do you think?

Thanks for untangling this! The logic you proposed makes perfect sense. If everyone agrees with this proposal, I will proceed with the changes and add the deprecation warning

The operator was incorrectly ignoring the `wait_policy` argument and always defaulting to waiting for cluster completion.

This change ensures the `wait_policy` is correctly persisted and used to select the appropriate waiter (e.g., for step completion), fixing the hardcoded behavior.
@vincbeck vincbeck merged commit e8b4a53 into apache:main Feb 3, 2026
90 checks passed
@henry3260 henry3260 deleted the add-wait-policy branch February 3, 2026 18:26
Alok-kumar-priyadarshi pushed a commit to Alok-kumar-priyadarshi/airflow that referenced this pull request Feb 5, 2026
The operator was incorrectly ignoring the `wait_policy` argument and always defaulting to waiting for cluster completion.

This change ensures the `wait_policy` is correctly persisted and used to select the appropriate waiter (e.g., for step completion), fixing the hardcoded behavior.
jhgoebbert pushed a commit to jhgoebbert/airflow_Owen-CH-Leung that referenced this pull request Feb 8, 2026
The operator was incorrectly ignoring the `wait_policy` argument and always defaulting to waiting for cluster completion.

This change ensures the `wait_policy` is correctly persisted and used to select the appropriate waiter (e.g., for step completion), fixing the hardcoded behavior.
Ratasa143 pushed a commit to Ratasa143/airflow that referenced this pull request Feb 15, 2026
The operator was incorrectly ignoring the `wait_policy` argument and always defaulting to waiting for cluster completion.

This change ensures the `wait_policy` is correctly persisted and used to select the appropriate waiter (e.g., for step completion), fixing the hardcoded behavior.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area:providers provider:amazon AWS/Amazon - related issues

Projects

None yet

Development

Successfully merging this pull request may close these issues.

wait_for_completion in EmrCreateJobFlorOperator should wait for the cluster to complete to return success

5 participants