Skip to content

Commit

Permalink
Move string arg evals to execute() in EksCreateClusterOperator
Browse files Browse the repository at this point in the history
Currently there are string-value evaluations of `compute`, `nodegroup_role_arn`,  and `fargate_pod_execution_role_arn` args in the constructor of `EksCreateClusterOperator`.  These args are all listed as a template fields so it's entirely possible that the value(s) passed in to the operator is a Jinja expression or an `XComArg`. Either of these value types could cause a false-negative `ValueError` (in the case of unsupported `compute` values) or a `false-positive` (in the the cases of explicit checks for the *arn values) since the values themselves have not been rendered.

This PR moves the evaluations of these args to the `execute()` scope.
  • Loading branch information
josh-fell committed May 25, 2022
1 parent d788f4b commit 69c0469
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 14 deletions.
28 changes: 14 additions & 14 deletions airflow/providers/amazon/aws/operators/eks.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,20 +139,6 @@ def __init__(
region: Optional[str] = None,
**kwargs,
) -> None:
if compute:
if compute not in SUPPORTED_COMPUTE_VALUES:
raise ValueError("Provided compute type is not supported.")
elif (compute == 'nodegroup') and not nodegroup_role_arn:
raise ValueError(
MISSING_ARN_MSG.format(compute=NODEGROUP_FULL_NAME, requirement='nodegroup_role_arn')
)
elif (compute == 'fargate') and not fargate_pod_execution_role_arn:
raise ValueError(
MISSING_ARN_MSG.format(
compute=FARGATE_FULL_NAME, requirement='fargate_pod_execution_role_arn'
)
)

self.compute = compute
self.cluster_name = cluster_name
self.cluster_role_arn = cluster_role_arn
Expand All @@ -170,6 +156,20 @@ def __init__(
super().__init__(**kwargs)

def execute(self, context: 'Context'):
if self.compute:
if self.compute not in SUPPORTED_COMPUTE_VALUES:
raise ValueError("Provided compute type is not supported.")
elif (self.compute == 'nodegroup') and not self.nodegroup_role_arn:
raise ValueError(
MISSING_ARN_MSG.format(compute=NODEGROUP_FULL_NAME, requirement='nodegroup_role_arn')
)
elif (self.compute == 'fargate') and not self.fargate_pod_execution_role_arn:
raise ValueError(
MISSING_ARN_MSG.format(
compute=FARGATE_FULL_NAME, requirement='fargate_pod_execution_role_arn'
)
)

eks_hook = EksHook(
aws_conn_id=self.aws_conn_id,
region_name=self.region,
Expand Down
38 changes: 38 additions & 0 deletions tests/providers/amazon/aws/operators/test_eks.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
from typing import Any, Dict, List
from unittest import mock

import pytest

from airflow.providers.amazon.aws.hooks.eks import ClusterStates, EksHook
from airflow.providers.amazon.aws.operators.eks import (
EksCreateClusterOperator,
Expand Down Expand Up @@ -202,6 +204,42 @@ def test_execute_when_called_with_fargate_creates_both(
**convert_keys(self.create_fargate_profile_params)
)

def test_invalid_compute_value(self):
invalid_compute = EksCreateClusterOperator(
task_id=TASK_ID,
**self.create_cluster_params,
compute='infinite',
)

with pytest.raises(ValueError, match="Provided compute type is not supported."):
invalid_compute.execute({})

def test_nodegroup_compute_missing_nodegroup_role_arn(self):
missing_nodegroup_role_arn = EksCreateClusterOperator(
task_id=TASK_ID,
**self.create_cluster_params,
compute='nodegroup',
)

with pytest.raises(
ValueError,
match="Creating an Amazon EKS managed node groups requires nodegroup_role_arn to be passed in.",
):
missing_nodegroup_role_arn.execute({})

def test_fargate_compute_missing_fargate_pod_execution_role_arn(self):
missing_fargate_pod_execution_role_arn = EksCreateClusterOperator(
task_id=TASK_ID,
**self.create_cluster_params,
compute='fargate',
)

with pytest.raises(
ValueError,
match="Creating an AWS Fargate profiles requires fargate_pod_execution_role_arn to be passed in.",
):
missing_fargate_pod_execution_role_arn.execute({})


class TestEksCreateFargateProfileOperator(unittest.TestCase):
def setUp(self) -> None:
Expand Down

0 comments on commit 69c0469

Please sign in to comment.