From 68a986e09f65465711bc8d52c3597bdcc1b9f0a2 Mon Sep 17 00:00:00 2001 From: laksh-krishna-sharma Date: Sat, 27 Sep 2025 04:49:21 +0530 Subject: [PATCH 1/8] refactor: deprecate wait_policy in EmrCreateJobFlowOperator in favor of wait_for_completion --- .../airflow/providers/amazon/aws/operators/emr.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/providers/amazon/src/airflow/providers/amazon/aws/operators/emr.py b/providers/amazon/src/airflow/providers/amazon/aws/operators/emr.py index 23487b687baa1..685605a407b3a 100644 --- a/providers/amazon/src/airflow/providers/amazon/aws/operators/emr.py +++ b/providers/amazon/src/airflow/providers/amazon/aws/operators/emr.py @@ -698,19 +698,19 @@ def __init__( super().__init__(**kwargs) self.emr_conn_id = emr_conn_id self.job_flow_overrides = job_flow_overrides or {} - self.wait_policy = wait_policy + self.wait_for_completion = wait_for_completion self.waiter_max_attempts = waiter_max_attempts or 60 self.waiter_delay = waiter_delay or 60 self.deferrable = deferrable - if wait_for_completion is not None: + if wait_policy is not None: warnings.warn( - "`wait_for_completion` parameter is deprecated, please use `wait_policy` instead.", + "`wait_policy` parameter is deprecated, please use `wait_for_completion` instead.", AirflowProviderDeprecationWarning, stacklevel=2, ) # preserve previous behaviour - self.wait_policy = WaitPolicy.WAIT_FOR_COMPLETION if wait_for_completion else None + self.wait_for_completion = wait_policy == WaitPolicy.WAIT_FOR_COMPLETION @property def _hook_parameters(self): @@ -748,8 +748,8 @@ def execute(self, context: Context) -> str | None: job_flow_id=self._job_flow_id, log_uri=get_log_uri(emr_client=self.hook.conn, job_flow_id=self._job_flow_id), ) - if self.wait_policy: - waiter_name = WAITER_POLICY_NAME_MAPPING[self.wait_policy] + if self.wait_for_completion: + waiter_name = WAITER_POLICY_NAME_MAPPING[WaitPolicy.WAIT_FOR_COMPLETION] if self.deferrable: self.defer( From 364c4232cbef09dff07e77e66b487265bf76a737 Mon Sep 17 00:00:00 2001 From: laksh-krishna-sharma Date: Sun, 28 Sep 2025 00:39:05 +0530 Subject: [PATCH 2/8] added unit test for refactor --- .../providers/amazon/aws/operators/emr.py | 5 ++- .../aws/operators/test_emr_create_job_flow.py | 32 +++++++++---------- 2 files changed, 18 insertions(+), 19 deletions(-) diff --git a/providers/amazon/src/airflow/providers/amazon/aws/operators/emr.py b/providers/amazon/src/airflow/providers/amazon/aws/operators/emr.py index 685605a407b3a..e8351202b5eac 100644 --- a/providers/amazon/src/airflow/providers/amazon/aws/operators/emr.py +++ b/providers/amazon/src/airflow/providers/amazon/aws/operators/emr.py @@ -654,11 +654,10 @@ class EmrCreateJobFlowOperator(AwsBaseOperator[EmrHook]): :param region_name: AWS region_name. If not specified then the default boto3 behaviour is used. :param verify: Whether or not to verify SSL certificates. See: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/core/session.html - :param wait_for_completion: Deprecated - use `wait_policy` instead. - Whether to finish task immediately after creation (False) or wait for jobflow + :param wait_for_completion: Whether to finish task immediately after creation (False) or wait for jobflow completion (True) (default: None) - :param wait_policy: Whether to finish the task immediately after creation (None) or: + :param wait_policy: Deprecated. Use `wait_for_completion` instead. Whether to finish the task immediately after creation (None) or: - wait for the jobflow completion (WaitPolicy.WAIT_FOR_COMPLETION) - wait for the jobflow completion and cluster to terminate (WaitPolicy.WAIT_FOR_STEPS_COMPLETION) (default: None) diff --git a/providers/amazon/tests/unit/amazon/aws/operators/test_emr_create_job_flow.py b/providers/amazon/tests/unit/amazon/aws/operators/test_emr_create_job_flow.py index 437d3c52d28b9..11a21fcb03597 100644 --- a/providers/amazon/tests/unit/amazon/aws/operators/test_emr_create_job_flow.py +++ b/providers/amazon/tests/unit/amazon/aws/operators/test_emr_create_job_flow.py @@ -26,7 +26,7 @@ from botocore.waiter import Waiter from jinja2 import StrictUndefined -from airflow.exceptions import TaskDeferred +from airflow.exceptions import AirflowProviderDeprecationWarning, TaskDeferred from airflow.models import DAG, DagRun, TaskInstance from airflow.providers.amazon.aws.operators.emr import EmrCreateJobFlowOperator from airflow.providers.amazon.aws.triggers.emr import EmrCreateJobFlowTrigger @@ -216,34 +216,26 @@ def test_execute_returns_job_id(self, mocked_hook_client): mocked_hook_client.run_job_flow.return_value = RUN_JOB_FLOW_SUCCESS_RETURN assert self.operator.execute(self.mock_context) == JOB_FLOW_ID - @pytest.mark.parametrize( - "wait_policy", - [ - pytest.param(WaitPolicy.WAIT_FOR_COMPLETION, id="with wait for completion"), - pytest.param(WaitPolicy.WAIT_FOR_STEPS_COMPLETION, id="with wait for steps completion policy"), - ], - ) @mock.patch("botocore.waiter.get_service_module_name", return_value="emr") @mock.patch.object(Waiter, "wait") - def test_execute_with_wait_policy(self, mock_waiter, _, mocked_hook_client, wait_policy: WaitPolicy): + def test_execute_with_wait_for_completion(self, mock_waiter, _, mocked_hook_client): mocked_hook_client.run_job_flow.return_value = RUN_JOB_FLOW_SUCCESS_RETURN - # Mock out the emr_client creator - self.operator.wait_policy = wait_policy + self.operator.wait_for_completion = True assert self.operator.execute(self.mock_context) == JOB_FLOW_ID mock_waiter.assert_called_once_with(mock.ANY, ClusterId=JOB_FLOW_ID, WaiterConfig=mock.ANY) - assert_expected_waiter_type(mock_waiter, WAITER_POLICY_NAME_MAPPING[wait_policy]) + assert_expected_waiter_type(mock_waiter, WAITER_POLICY_NAME_MAPPING[WaitPolicy.WAIT_FOR_COMPLETION]) def test_create_job_flow_deferrable(self, mocked_hook_client): """ Test to make sure that the operator raises a TaskDeferred exception - if run in deferrable mode and wait_policy is set. + if run in deferrable mode and wait_for_completion is set. """ mocked_hook_client.run_job_flow.return_value = RUN_JOB_FLOW_SUCCESS_RETURN self.operator.deferrable = True - self.operator.wait_policy = WaitPolicy.WAIT_FOR_COMPLETION + self.operator.wait_for_completion = True with pytest.raises(TaskDeferred) as exc: self.operator.execute(self.mock_context) @@ -254,14 +246,22 @@ def test_create_job_flow_deferrable(self, mocked_hook_client): def test_create_job_flow_deferrable_no_wait(self, mocked_hook_client): """ Test to make sure that the operator does NOT raise a TaskDeferred exception - if run in deferrable mode but wait_policy is not set. + if run in deferrable mode but wait_for_completion is not set. """ mocked_hook_client.run_job_flow.return_value = RUN_JOB_FLOW_SUCCESS_RETURN self.operator.deferrable = True - # wait_policy is None by default + # wait_for_completion is None by default result = self.operator.execute(self.mock_context) assert result == JOB_FLOW_ID def test_template_fields(self): validate_template_fields(self.operator) + + def test_wait_policy_deprecation_warning(self): + """Test that using wait_policy raises a deprecation warning.""" + with pytest.warns(AirflowProviderDeprecationWarning, match="`wait_policy` parameter is deprecated"): + EmrCreateJobFlowOperator( + task_id=TASK_ID, + wait_policy=WaitPolicy.WAIT_FOR_COMPLETION, + ) From f8e19442e60be1ec382df82e55c0a66ed4943f94 Mon Sep 17 00:00:00 2001 From: laksh-krishna-sharma Date: Sun, 28 Sep 2025 01:59:25 +0530 Subject: [PATCH 3/8] resolved copilot comments --- .../src/airflow/providers/amazon/aws/operators/emr.py | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/providers/amazon/src/airflow/providers/amazon/aws/operators/emr.py b/providers/amazon/src/airflow/providers/amazon/aws/operators/emr.py index e8351202b5eac..d54b699843f4d 100644 --- a/providers/amazon/src/airflow/providers/amazon/aws/operators/emr.py +++ b/providers/amazon/src/airflow/providers/amazon/aws/operators/emr.py @@ -708,8 +708,12 @@ def __init__( AirflowProviderDeprecationWarning, stacklevel=2, ) + self.wait_policy = wait_policy # preserve previous behaviour - self.wait_for_completion = wait_policy == WaitPolicy.WAIT_FOR_COMPLETION + self.wait_for_completion = wait_policy in ( + WaitPolicy.WAIT_FOR_COMPLETION, + WaitPolicy.WAIT_FOR_STEPS_COMPLETION, + ) @property def _hook_parameters(self): @@ -748,7 +752,10 @@ def execute(self, context: Context) -> str | None: log_uri=get_log_uri(emr_client=self.hook.conn, job_flow_id=self._job_flow_id), ) if self.wait_for_completion: - waiter_name = WAITER_POLICY_NAME_MAPPING[WaitPolicy.WAIT_FOR_COMPLETION] + if hasattr(self, "wait_policy") and self.wait_policy: + waiter_name = WAITER_POLICY_NAME_MAPPING[self.wait_policy] + else: + waiter_name = WAITER_POLICY_NAME_MAPPING[WaitPolicy.WAIT_FOR_COMPLETION] if self.deferrable: self.defer( From 4b2895e277098ed5922b59d45b94514e2c72274e Mon Sep 17 00:00:00 2001 From: laksh-krishna-sharma Date: Sun, 28 Sep 2025 02:20:21 +0530 Subject: [PATCH 4/8] resolved copilot comments --- .../amazon/src/airflow/providers/amazon/aws/operators/emr.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/providers/amazon/src/airflow/providers/amazon/aws/operators/emr.py b/providers/amazon/src/airflow/providers/amazon/aws/operators/emr.py index d54b699843f4d..65e351a1b4279 100644 --- a/providers/amazon/src/airflow/providers/amazon/aws/operators/emr.py +++ b/providers/amazon/src/airflow/providers/amazon/aws/operators/emr.py @@ -701,6 +701,7 @@ def __init__( self.waiter_max_attempts = waiter_max_attempts or 60 self.waiter_delay = waiter_delay or 60 self.deferrable = deferrable + self.wait_policy = wait_policy if wait_policy is not None: warnings.warn( @@ -708,7 +709,6 @@ def __init__( AirflowProviderDeprecationWarning, stacklevel=2, ) - self.wait_policy = wait_policy # preserve previous behaviour self.wait_for_completion = wait_policy in ( WaitPolicy.WAIT_FOR_COMPLETION, @@ -752,7 +752,7 @@ def execute(self, context: Context) -> str | None: log_uri=get_log_uri(emr_client=self.hook.conn, job_flow_id=self._job_flow_id), ) if self.wait_for_completion: - if hasattr(self, "wait_policy") and self.wait_policy: + if self.wait_policy: waiter_name = WAITER_POLICY_NAME_MAPPING[self.wait_policy] else: waiter_name = WAITER_POLICY_NAME_MAPPING[WaitPolicy.WAIT_FOR_COMPLETION] From bb65acffa81641eec4930de4addd2c67660ae540 Mon Sep 17 00:00:00 2001 From: laksh-krishna-sharma Date: Sun, 28 Sep 2025 02:59:54 +0530 Subject: [PATCH 5/8] fixed failing test --- .../providers/amazon/aws/operators/emr.py | 16 +++++++--------- 1 file changed, 7 insertions(+), 9 deletions(-) diff --git a/providers/amazon/src/airflow/providers/amazon/aws/operators/emr.py b/providers/amazon/src/airflow/providers/amazon/aws/operators/emr.py index 65e351a1b4279..a6c0775fd6565 100644 --- a/providers/amazon/src/airflow/providers/amazon/aws/operators/emr.py +++ b/providers/amazon/src/airflow/providers/amazon/aws/operators/emr.py @@ -709,11 +709,12 @@ def __init__( AirflowProviderDeprecationWarning, stacklevel=2, ) - # preserve previous behaviour - self.wait_for_completion = wait_policy in ( - WaitPolicy.WAIT_FOR_COMPLETION, - WaitPolicy.WAIT_FOR_STEPS_COMPLETION, - ) + if wait_policy == WaitPolicy.WAIT_FOR_COMPLETION: + self.wait_for_completion = True + elif wait_policy == WaitPolicy.WAIT_FOR_STEPS_COMPLETION: + self.wait_for_completion = "steps" + else: + self.wait_for_completion = False @property def _hook_parameters(self): @@ -752,10 +753,7 @@ def execute(self, context: Context) -> str | None: log_uri=get_log_uri(emr_client=self.hook.conn, job_flow_id=self._job_flow_id), ) if self.wait_for_completion: - if self.wait_policy: - waiter_name = WAITER_POLICY_NAME_MAPPING[self.wait_policy] - else: - waiter_name = WAITER_POLICY_NAME_MAPPING[WaitPolicy.WAIT_FOR_COMPLETION] + waiter_name = WAITER_POLICY_NAME_MAPPING[WaitPolicy.WAIT_FOR_COMPLETION] if self.deferrable: self.defer( From 57e06e608c25c58282639570be21fd05a2874e45 Mon Sep 17 00:00:00 2001 From: laksh-krishna-sharma Date: Sun, 28 Sep 2025 03:43:17 +0530 Subject: [PATCH 6/8] fixed: refactor of wait_policy --- .../airflow/providers/amazon/aws/operators/emr.py | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/providers/amazon/src/airflow/providers/amazon/aws/operators/emr.py b/providers/amazon/src/airflow/providers/amazon/aws/operators/emr.py index a6c0775fd6565..c3fabb9ab43ee 100644 --- a/providers/amazon/src/airflow/providers/amazon/aws/operators/emr.py +++ b/providers/amazon/src/airflow/providers/amazon/aws/operators/emr.py @@ -701,7 +701,6 @@ def __init__( self.waiter_max_attempts = waiter_max_attempts or 60 self.waiter_delay = waiter_delay or 60 self.deferrable = deferrable - self.wait_policy = wait_policy if wait_policy is not None: warnings.warn( @@ -709,12 +708,11 @@ def __init__( AirflowProviderDeprecationWarning, stacklevel=2, ) - if wait_policy == WaitPolicy.WAIT_FOR_COMPLETION: - self.wait_for_completion = True - elif wait_policy == WaitPolicy.WAIT_FOR_STEPS_COMPLETION: - self.wait_for_completion = "steps" - else: - self.wait_for_completion = False + self.wait_policy = wait_policy + self.wait_for_completion = wait_policy in ( + WaitPolicy.WAIT_FOR_COMPLETION, + WaitPolicy.WAIT_FOR_STEPS_COMPLETION, + ) @property def _hook_parameters(self): From c579178842542953baf168504f327ae98b6aa227 Mon Sep 17 00:00:00 2001 From: laksh-krishna-sharma Date: Mon, 29 Sep 2025 01:22:17 +0530 Subject: [PATCH 7/8] ensured backward compatibility --- .../src/airflow/providers/amazon/aws/operators/emr.py | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/providers/amazon/src/airflow/providers/amazon/aws/operators/emr.py b/providers/amazon/src/airflow/providers/amazon/aws/operators/emr.py index c3fabb9ab43ee..ac6373d2f5a3c 100644 --- a/providers/amazon/src/airflow/providers/amazon/aws/operators/emr.py +++ b/providers/amazon/src/airflow/providers/amazon/aws/operators/emr.py @@ -704,11 +704,19 @@ def __init__( if wait_policy is not None: warnings.warn( - "`wait_policy` parameter is deprecated, please use `wait_for_completion` instead.", + "`wait_policy` parameter is deprecated and will be removed in a future release; " + "please use `wait_for_completion` (bool) instead.", AirflowProviderDeprecationWarning, stacklevel=2, ) self.wait_policy = wait_policy + + if wait_for_completion is not None: + raise ValueError( + "Cannot specify both `wait_for_completion` and deprecated `wait_policy`. " + "Please use `wait_for_completion` (bool)." + ) + self.wait_for_completion = wait_policy in ( WaitPolicy.WAIT_FOR_COMPLETION, WaitPolicy.WAIT_FOR_STEPS_COMPLETION, From 6b2c25dcdc54b9f87c4de27d8a5b6cb353aa7f24 Mon Sep 17 00:00:00 2001 From: laksh-krishna-sharma Date: Tue, 30 Sep 2025 02:30:07 +0530 Subject: [PATCH 8/8] removed "self.wait_policy = wait_policy" --- .../amazon/src/airflow/providers/amazon/aws/operators/emr.py | 1 - 1 file changed, 1 deletion(-) diff --git a/providers/amazon/src/airflow/providers/amazon/aws/operators/emr.py b/providers/amazon/src/airflow/providers/amazon/aws/operators/emr.py index ac6373d2f5a3c..c023bc04e14b3 100644 --- a/providers/amazon/src/airflow/providers/amazon/aws/operators/emr.py +++ b/providers/amazon/src/airflow/providers/amazon/aws/operators/emr.py @@ -709,7 +709,6 @@ def __init__( AirflowProviderDeprecationWarning, stacklevel=2, ) - self.wait_policy = wait_policy if wait_for_completion is not None: raise ValueError(