diff --git a/providers/amazon/src/airflow/providers/amazon/aws/operators/emr.py b/providers/amazon/src/airflow/providers/amazon/aws/operators/emr.py index 48e8773c16a09..9402c1e05930a 100644 --- a/providers/amazon/src/airflow/providers/amazon/aws/operators/emr.py +++ b/providers/amazon/src/airflow/providers/amazon/aws/operators/emr.py @@ -701,25 +701,12 @@ def __init__( self.waiter_max_attempts = waiter_max_attempts or 60 self.waiter_delay = waiter_delay or 60 self.deferrable = deferrable + self.wait_policy = wait_policy if wait_policy is not None: - warnings.warn( - "`wait_policy` parameter is deprecated and will be removed in a future release; " - "please use `wait_for_completion` (bool) instead.", - AirflowProviderDeprecationWarning, - stacklevel=2, - ) - - if wait_for_completion is not None: - raise ValueError( - "Cannot specify both `wait_for_completion` and deprecated `wait_policy`. " - "Please use `wait_for_completion` (bool)." - ) - - self.wait_for_completion = wait_policy in ( - WaitPolicy.WAIT_FOR_COMPLETION, - WaitPolicy.WAIT_FOR_STEPS_COMPLETION, - ) + if wait_for_completion is False: + raise ValueError("Cannot specify wait_policy with wait_for_completion=False") + self.wait_for_completion = True @property def _hook_parameters(self): @@ -758,7 +745,7 @@ def execute(self, context: Context) -> str | None: log_uri=get_log_uri(emr_client=self.hook.conn, job_flow_id=self._job_flow_id), ) if self.wait_for_completion: - waiter_name = WAITER_POLICY_NAME_MAPPING[WaitPolicy.WAIT_FOR_COMPLETION] + waiter_name = WAITER_POLICY_NAME_MAPPING[self.wait_policy or WaitPolicy.WAIT_FOR_COMPLETION] if self.deferrable: self.defer( diff --git a/providers/amazon/tests/unit/amazon/aws/operators/test_emr_create_job_flow.py b/providers/amazon/tests/unit/amazon/aws/operators/test_emr_create_job_flow.py index e49f7ba775806..941bba19d2e69 100644 --- a/providers/amazon/tests/unit/amazon/aws/operators/test_emr_create_job_flow.py +++ b/providers/amazon/tests/unit/amazon/aws/operators/test_emr_create_job_flow.py @@ -254,10 +254,20 @@ def test_create_job_flow_deferrable_no_wait(self, mocked_hook_client): def test_template_fields(self): validate_template_fields(self.operator) - def test_wait_policy_deprecation_warning(self): - """Test that using wait_policy raises a deprecation warning.""" - with pytest.warns(AirflowProviderDeprecationWarning, match="`wait_policy` parameter is deprecated"): - EmrCreateJobFlowOperator( - task_id=TASK_ID, - wait_policy=WaitPolicy.WAIT_FOR_COMPLETION, - ) + @mock.patch("botocore.waiter.get_service_module_name", return_value="emr") + @mock.patch.object(Waiter, "wait") + def test_execute_with_wait_policy(self, mock_waiter, _, mocked_hook_client): + mocked_hook_client.run_job_flow.return_value = RUN_JOB_FLOW_SUCCESS_RETURN + + # Test that providing wait_policy uses the correct waiter + op = EmrCreateJobFlowOperator( + task_id="test_wait_policy", + aws_conn_id="aws_default", + emr_conn_id="emr_default", + wait_policy=WaitPolicy.WAIT_FOR_STEPS_COMPLETION, + ) + + op.execute(self.mock_context) + + mock_waiter.assert_called_once_with(mock.ANY, ClusterId=JOB_FLOW_ID, WaiterConfig=mock.ANY) + assert_expected_waiter_type(mock_waiter, WAITER_POLICY_NAME_MAPPING[WaitPolicy.WAIT_FOR_STEPS_COMPLETION]) diff --git a/providers/microsoft/azure/src/airflow/providers/microsoft/azure/hooks/powerbi.py b/providers/microsoft/azure/src/airflow/providers/microsoft/azure/hooks/powerbi.py index 3c2d006795ed3..b0d58d4fba7d3 100644 --- a/providers/microsoft/azure/src/airflow/providers/microsoft/azure/hooks/powerbi.py +++ b/providers/microsoft/azure/src/airflow/providers/microsoft/azure/hooks/powerbi.py @@ -130,7 +130,7 @@ async def get_refresh_history( """ try: response = await self.run( - url="myorg/groups/{group_id}/datasets/{dataset_id}/refreshes", + url="https://api.powerbi.com/v1.0/myorg/groups/{group_id}/datasets/{dataset_id}/refreshes", path_parameters={ "group_id": group_id, "dataset_id": dataset_id, @@ -203,7 +203,7 @@ async def trigger_dataset_refresh( """ try: response = await self.run( - url="myorg/groups/{group_id}/datasets/{dataset_id}/refreshes", + url="https://api.powerbi.com/v1.0/myorg/groups/{group_id}/datasets/{dataset_id}/refreshes", response_type=None, method="POST", path_parameters={ @@ -225,7 +225,7 @@ async def get_workspace_list(self) -> list[str]: :return: List of workspace IDs. """ try: - response = await self.run(url="myorg/groups", method="GET") + response = await self.run(url="https://api.powerbi.com/v1.0/myorg/groups", method="GET") list_of_workspaces = response.get("value", []) @@ -243,7 +243,7 @@ async def get_dataset_list(self, *, group_id: str) -> list[str]: :return: List of dataset IDs. """ try: - response = await self.run(url=f"myorg/groups/{group_id}/datasets", method="GET") + response = await self.run(url=f"https://api.powerbi.com/v1.0/myorg/groups/{group_id}/datasets", method="GET") list_of_datasets = response.get("value", []) @@ -261,7 +261,7 @@ async def cancel_dataset_refresh(self, dataset_id: str, group_id: str, dataset_r :param dataset_refresh_id: The dataset refresh Id. """ await self.run( - url="myorg/groups/{group_id}/datasets/{dataset_id}/refreshes/{dataset_refresh_id}", + url="https://api.powerbi.com/v1.0/myorg/groups/{group_id}/datasets/{dataset_id}/refreshes/{dataset_refresh_id}", response_type=None, path_parameters={ "group_id": group_id,