Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ def _create_compute(
fargate_selectors: list | None = None,
create_fargate_profile_kwargs: dict | None = None,
subnets: list[str] | None = None,
delete_nodegroup_on_failure: bool = True,
):
log = logging.getLogger(__name__)
eks_hook = EksHook(aws_conn_id=aws_conn_id, region_name=region)
Expand All @@ -104,17 +105,49 @@ def _create_compute(
nodeRole=nodegroup_role_arn,
**create_nodegroup_kwargs,
)
if wait_for_completion:
log.info("Waiting for nodegroup to provision. This will take some time.")
wait(
waiter=eks_hook.conn.get_waiter("nodegroup_active"),
waiter_delay=waiter_delay,
waiter_max_attempts=waiter_max_attempts,
args={"clusterName": cluster_name, "nodegroupName": nodegroup_name},
failure_message="Nodegroup creation failed",
status_message="Nodegroup status is",
status_args=["nodegroup.status"],
try:
if wait_for_completion:
log.info("Waiting for nodegroup to provision. This will take some time.")
wait(
waiter=eks_hook.conn.get_waiter("nodegroup_active"),
waiter_delay=waiter_delay,
waiter_max_attempts=waiter_max_attempts,
args={"clusterName": cluster_name, "nodegroupName": nodegroup_name},
failure_message="Nodegroup creation failed",
status_message="Nodegroup status is",
status_args=["nodegroup.status"],
)

# waiter_with_logging.wait wraps botocore WaiterError in AirflowException.
# WaiterError is caught to handle changes in the implementation of waiter_with_logging.wait.
except (WaiterError, AirflowException):
# Best-effort cleanup when post-initiation steps fail (e.g. IAM/permission errors).
log.exception(
"Nodegroup '%s' in cluster '%s' failed after creation during wait phase",
nodegroup_name,
cluster_name,
)

# delete_nodegroup_on_failure defaults to True to prevent orphaned nodegroups.
if delete_nodegroup_on_failure:
try:
eks_hook.delete_nodegroup(
clusterName=cluster_name,
nodegroupName=nodegroup_name,
)
log.info(
"Issued delete request for nodegroup '%s' in cluster '%s' after failure.",
nodegroup_name,
cluster_name,
)
except ClientError:
log.exception(
"Failed to cleanup nodegroup '%s' in cluster '%s' after failure; "
"manual cleanup may be required.",
nodegroup_name,
cluster_name,
)
raise
elif compute == "fargate" and fargate_profile_name:
# this is to satisfy mypy
create_fargate_profile_kwargs = create_fargate_profile_kwargs or {}
Expand Down Expand Up @@ -199,7 +232,8 @@ class EksCreateClusterOperator(AwsBaseOperator[EksHook]):
:param deferrable: If True, the operator will wait asynchronously for the job to complete.
This implies waiting for completion. This mode requires aiobotocore module to be installed.
(default: False)

:param delete_nodegroup_on_failure: Whether to attempt best-effort deletion of the managed nodegroup if creation
fails during the wait phase after successful initiation. Defaults to True.
"""

aws_hook_class = EksHook
Expand Down Expand Up @@ -238,6 +272,7 @@ def __init__(
deferrable: bool = conf.getboolean("operators", "default_deferrable", fallback=False),
waiter_delay: int = 30,
waiter_max_attempts: int = 40,
delete_nodegroup_on_failure: bool = True,
**kwargs,
) -> None:
self.compute = compute
Expand All @@ -258,6 +293,7 @@ def __init__(
self.fargate_selectors = fargate_selectors or [{"namespace": DEFAULT_NAMESPACE_NAME}]
self.fargate_profile_name = fargate_profile_name
self.deferrable = deferrable
self.delete_nodegroup_on_failure = delete_nodegroup_on_failure

if region is not None:
warnings.warn(
Expand Down Expand Up @@ -379,6 +415,7 @@ def deferrable_create_cluster_next(self, context: Context, event: dict[str, Any]
fargate_selectors=self.fargate_selectors,
create_fargate_profile_kwargs=self.create_fargate_profile_kwargs,
subnets=cast("list[str]", self.resources_vpc_config.get("subnetIds")),
delete_nodegroup_on_failure=self.delete_nodegroup_on_failure,
)
if self.compute == "fargate":
self.defer(
Expand Down Expand Up @@ -454,7 +491,8 @@ class EksCreateNodegroupOperator(AwsBaseOperator[EksHook]):
:param deferrable: If True, the operator will wait asynchronously for the nodegroup to be created.
This implies waiting for completion. This mode requires aiobotocore module to be installed.
(default: False)

:param delete_nodegroup_on_failure: Whether to attempt best-effort deletion of the managed nodegroup if creation
fails during the wait phase after successful initiation. Defaults to True.
"""

aws_hook_class = EksHook
Expand All @@ -479,6 +517,7 @@ def __init__(
waiter_delay: int = 30,
waiter_max_attempts: int = 80,
deferrable: bool = conf.getboolean("operators", "default_deferrable", fallback=False),
delete_nodegroup_on_failure: bool = True,
**kwargs,
) -> None:
self.nodegroup_subnets = nodegroup_subnets
Expand All @@ -493,6 +532,7 @@ def __init__(
self.waiter_delay = waiter_delay
self.waiter_max_attempts = waiter_max_attempts
self.deferrable = deferrable
self.delete_nodegroup_on_failure = delete_nodegroup_on_failure

if region is not None:
warnings.warn(
Expand Down Expand Up @@ -531,6 +571,7 @@ def execute(self, context: Context):
nodegroup_role_arn=self.nodegroup_role_arn,
create_nodegroup_kwargs=self.create_nodegroup_kwargs,
subnets=self.nodegroup_subnets,
delete_nodegroup_on_failure=self.delete_nodegroup_on_failure,
)

if self.deferrable:
Expand Down
89 changes: 88 additions & 1 deletion providers/amazon/tests/unit/amazon/aws/operators/test_eks.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,10 @@
from unittest import mock

import pytest
from botocore.exceptions import ClientError
from botocore.waiter import Waiter

from airflow.exceptions import AirflowProviderDeprecationWarning
from airflow.exceptions import AirflowException, AirflowProviderDeprecationWarning
from airflow.providers.amazon.aws.hooks.eks import ClusterStates, EksHook
from airflow.providers.amazon.aws.operators.eks import (
EksCreateClusterOperator,
Expand Down Expand Up @@ -586,6 +587,92 @@ def test_init_with_region(self):
)
assert m.operator.region_name == "us-east-2"

@mock.patch.object(EksHook, "delete_nodegroup")
@mock.patch("airflow.providers.amazon.aws.operators.eks.wait")
@mock.patch.object(EksHook, "create_nodegroup")
def test_nodegroup_cleanup_on_waiter_auth_failure(
self,
mock_create_nodegroup,
mock_waiter,
mock_delete_nodegroup,
):
# Airflow currently wraps waiter errors with AirflowException, but the code intentionally supports both.
waiter_error = AirflowException("Nodegroup creation failed: Waiter NodegroupActive failed")
mock_waiter.side_effect = waiter_error

operator = EksCreateNodegroupOperator(
task_id=TASK_ID,
cluster_name=CLUSTER_NAME,
nodegroup_name=NODEGROUP_NAME,
nodegroup_subnets=SUBNET_IDS,
nodegroup_role_arn=NODEROLE_ARN[1],
wait_for_completion=True,
delete_nodegroup_on_failure=True,
)

with pytest.raises(AirflowException):
operator.execute({})

# Nodegroup creation happened.
mock_create_nodegroup.assert_called_once()

# Cleanup attempted.
mock_delete_nodegroup.assert_called_once_with(
clusterName=CLUSTER_NAME,
nodegroupName=NODEGROUP_NAME,
)

@mock.patch.object(EksHook, "delete_nodegroup")
@mock.patch("airflow.providers.amazon.aws.operators.eks.wait")
@mock.patch.object(EksHook, "create_nodegroup")
def test_nodegroup_cleanup_failure_does_not_mask_original_error(
self,
mock_create_nodegroup,
mock_waiter,
mock_delete_nodegroup,
):
# Airflow currently wraps waiter errors with AirflowException, but the code intentionally supports both.
waiter_error = AirflowException("Nodegroup creation failed: Waiter NodegroupActive failed")

cleanup_error = ClientError(
error_response={
"Error": {
"Code": "UnauthorizedOperation",
"Message": "You are not authorized to perform this operation",
}
},
operation_name="DeleteNodegroup",
)

mock_waiter.side_effect = waiter_error
mock_delete_nodegroup.side_effect = cleanup_error

operator = EksCreateNodegroupOperator(
task_id=TASK_ID,
cluster_name=CLUSTER_NAME,
nodegroup_name=NODEGROUP_NAME,
nodegroup_subnets=SUBNET_IDS,
nodegroup_role_arn=NODEROLE_ARN[1],
wait_for_completion=True,
delete_nodegroup_on_failure=True,
)

with pytest.raises(AirflowException) as exc:
operator.execute({})

# Original error preserved.
assert isinstance(exc.value, AirflowException)
assert "Nodegroup creation failed" in str(exc.value)

# Nodegroup creation happened.
mock_create_nodegroup.assert_called_once()

# Cleanup attempted.
mock_delete_nodegroup.assert_called_once_with(
clusterName=CLUSTER_NAME,
nodegroupName=NODEGROUP_NAME,
)


class TestEksDeleteClusterOperator:
def setup_method(self) -> None:
Expand Down