diff --git a/providers/amazon/src/airflow/providers/amazon/aws/operators/eks.py b/providers/amazon/src/airflow/providers/amazon/aws/operators/eks.py index a35a206cadc51..2b6ae60dc1875 100644 --- a/providers/amazon/src/airflow/providers/amazon/aws/operators/eks.py +++ b/providers/amazon/src/airflow/providers/amazon/aws/operators/eks.py @@ -89,6 +89,7 @@ def _create_compute( fargate_selectors: list | None = None, create_fargate_profile_kwargs: dict | None = None, subnets: list[str] | None = None, + delete_nodegroup_on_failure: bool = True, ): log = logging.getLogger(__name__) eks_hook = EksHook(aws_conn_id=aws_conn_id, region_name=region) @@ -104,17 +105,49 @@ def _create_compute( nodeRole=nodegroup_role_arn, **create_nodegroup_kwargs, ) - if wait_for_completion: - log.info("Waiting for nodegroup to provision. This will take some time.") - wait( - waiter=eks_hook.conn.get_waiter("nodegroup_active"), - waiter_delay=waiter_delay, - waiter_max_attempts=waiter_max_attempts, - args={"clusterName": cluster_name, "nodegroupName": nodegroup_name}, - failure_message="Nodegroup creation failed", - status_message="Nodegroup status is", - status_args=["nodegroup.status"], + try: + if wait_for_completion: + log.info("Waiting for nodegroup to provision. This will take some time.") + wait( + waiter=eks_hook.conn.get_waiter("nodegroup_active"), + waiter_delay=waiter_delay, + waiter_max_attempts=waiter_max_attempts, + args={"clusterName": cluster_name, "nodegroupName": nodegroup_name}, + failure_message="Nodegroup creation failed", + status_message="Nodegroup status is", + status_args=["nodegroup.status"], + ) + + # waiter_with_logging.wait wraps botocore WaiterError in AirflowException. + # WaiterError is caught to handle changes in the implementation of waiter_with_logging.wait. + except (WaiterError, AirflowException): + # Best-effort cleanup when post-initiation steps fail (e.g. IAM/permission errors). + log.exception( + "Nodegroup '%s' in cluster '%s' failed after creation during wait phase", + nodegroup_name, + cluster_name, ) + + # delete_nodegroup_on_failure defaults to True to prevent orphaned nodegroups. + if delete_nodegroup_on_failure: + try: + eks_hook.delete_nodegroup( + clusterName=cluster_name, + nodegroupName=nodegroup_name, + ) + log.info( + "Issued delete request for nodegroup '%s' in cluster '%s' after failure.", + nodegroup_name, + cluster_name, + ) + except ClientError: + log.exception( + "Failed to cleanup nodegroup '%s' in cluster '%s' after failure; " + "manual cleanup may be required.", + nodegroup_name, + cluster_name, + ) + raise elif compute == "fargate" and fargate_profile_name: # this is to satisfy mypy create_fargate_profile_kwargs = create_fargate_profile_kwargs or {} @@ -199,7 +232,8 @@ class EksCreateClusterOperator(AwsBaseOperator[EksHook]): :param deferrable: If True, the operator will wait asynchronously for the job to complete. This implies waiting for completion. This mode requires aiobotocore module to be installed. (default: False) - + :param delete_nodegroup_on_failure: Whether to attempt best-effort deletion of the managed nodegroup if creation + fails during the wait phase after successful initiation. Defaults to True. """ aws_hook_class = EksHook @@ -238,6 +272,7 @@ def __init__( deferrable: bool = conf.getboolean("operators", "default_deferrable", fallback=False), waiter_delay: int = 30, waiter_max_attempts: int = 40, + delete_nodegroup_on_failure: bool = True, **kwargs, ) -> None: self.compute = compute @@ -258,6 +293,7 @@ def __init__( self.fargate_selectors = fargate_selectors or [{"namespace": DEFAULT_NAMESPACE_NAME}] self.fargate_profile_name = fargate_profile_name self.deferrable = deferrable + self.delete_nodegroup_on_failure = delete_nodegroup_on_failure if region is not None: warnings.warn( @@ -379,6 +415,7 @@ def deferrable_create_cluster_next(self, context: Context, event: dict[str, Any] fargate_selectors=self.fargate_selectors, create_fargate_profile_kwargs=self.create_fargate_profile_kwargs, subnets=cast("list[str]", self.resources_vpc_config.get("subnetIds")), + delete_nodegroup_on_failure=self.delete_nodegroup_on_failure, ) if self.compute == "fargate": self.defer( @@ -454,7 +491,8 @@ class EksCreateNodegroupOperator(AwsBaseOperator[EksHook]): :param deferrable: If True, the operator will wait asynchronously for the nodegroup to be created. This implies waiting for completion. This mode requires aiobotocore module to be installed. (default: False) - + :param delete_nodegroup_on_failure: Whether to attempt best-effort deletion of the managed nodegroup if creation + fails during the wait phase after successful initiation. Defaults to True. """ aws_hook_class = EksHook @@ -479,6 +517,7 @@ def __init__( waiter_delay: int = 30, waiter_max_attempts: int = 80, deferrable: bool = conf.getboolean("operators", "default_deferrable", fallback=False), + delete_nodegroup_on_failure: bool = True, **kwargs, ) -> None: self.nodegroup_subnets = nodegroup_subnets @@ -493,6 +532,7 @@ def __init__( self.waiter_delay = waiter_delay self.waiter_max_attempts = waiter_max_attempts self.deferrable = deferrable + self.delete_nodegroup_on_failure = delete_nodegroup_on_failure if region is not None: warnings.warn( @@ -531,6 +571,7 @@ def execute(self, context: Context): nodegroup_role_arn=self.nodegroup_role_arn, create_nodegroup_kwargs=self.create_nodegroup_kwargs, subnets=self.nodegroup_subnets, + delete_nodegroup_on_failure=self.delete_nodegroup_on_failure, ) if self.deferrable: diff --git a/providers/amazon/tests/unit/amazon/aws/operators/test_eks.py b/providers/amazon/tests/unit/amazon/aws/operators/test_eks.py index d997483a60db6..3cf453cb5e051 100644 --- a/providers/amazon/tests/unit/amazon/aws/operators/test_eks.py +++ b/providers/amazon/tests/unit/amazon/aws/operators/test_eks.py @@ -21,9 +21,10 @@ from unittest import mock import pytest +from botocore.exceptions import ClientError from botocore.waiter import Waiter -from airflow.exceptions import AirflowProviderDeprecationWarning +from airflow.exceptions import AirflowException, AirflowProviderDeprecationWarning from airflow.providers.amazon.aws.hooks.eks import ClusterStates, EksHook from airflow.providers.amazon.aws.operators.eks import ( EksCreateClusterOperator, @@ -586,6 +587,92 @@ def test_init_with_region(self): ) assert m.operator.region_name == "us-east-2" + @mock.patch.object(EksHook, "delete_nodegroup") + @mock.patch("airflow.providers.amazon.aws.operators.eks.wait") + @mock.patch.object(EksHook, "create_nodegroup") + def test_nodegroup_cleanup_on_waiter_auth_failure( + self, + mock_create_nodegroup, + mock_waiter, + mock_delete_nodegroup, + ): + # Airflow currently wraps waiter errors with AirflowException, but the code intentionally supports both. + waiter_error = AirflowException("Nodegroup creation failed: Waiter NodegroupActive failed") + mock_waiter.side_effect = waiter_error + + operator = EksCreateNodegroupOperator( + task_id=TASK_ID, + cluster_name=CLUSTER_NAME, + nodegroup_name=NODEGROUP_NAME, + nodegroup_subnets=SUBNET_IDS, + nodegroup_role_arn=NODEROLE_ARN[1], + wait_for_completion=True, + delete_nodegroup_on_failure=True, + ) + + with pytest.raises(AirflowException): + operator.execute({}) + + # Nodegroup creation happened. + mock_create_nodegroup.assert_called_once() + + # Cleanup attempted. + mock_delete_nodegroup.assert_called_once_with( + clusterName=CLUSTER_NAME, + nodegroupName=NODEGROUP_NAME, + ) + + @mock.patch.object(EksHook, "delete_nodegroup") + @mock.patch("airflow.providers.amazon.aws.operators.eks.wait") + @mock.patch.object(EksHook, "create_nodegroup") + def test_nodegroup_cleanup_failure_does_not_mask_original_error( + self, + mock_create_nodegroup, + mock_waiter, + mock_delete_nodegroup, + ): + # Airflow currently wraps waiter errors with AirflowException, but the code intentionally supports both. + waiter_error = AirflowException("Nodegroup creation failed: Waiter NodegroupActive failed") + + cleanup_error = ClientError( + error_response={ + "Error": { + "Code": "UnauthorizedOperation", + "Message": "You are not authorized to perform this operation", + } + }, + operation_name="DeleteNodegroup", + ) + + mock_waiter.side_effect = waiter_error + mock_delete_nodegroup.side_effect = cleanup_error + + operator = EksCreateNodegroupOperator( + task_id=TASK_ID, + cluster_name=CLUSTER_NAME, + nodegroup_name=NODEGROUP_NAME, + nodegroup_subnets=SUBNET_IDS, + nodegroup_role_arn=NODEROLE_ARN[1], + wait_for_completion=True, + delete_nodegroup_on_failure=True, + ) + + with pytest.raises(AirflowException) as exc: + operator.execute({}) + + # Original error preserved. + assert isinstance(exc.value, AirflowException) + assert "Nodegroup creation failed" in str(exc.value) + + # Nodegroup creation happened. + mock_create_nodegroup.assert_called_once() + + # Cleanup attempted. + mock_delete_nodegroup.assert_called_once_with( + clusterName=CLUSTER_NAME, + nodegroupName=NODEGROUP_NAME, + ) + class TestEksDeleteClusterOperator: def setup_method(self) -> None: