Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move AWS Managed Service for Apache Flink sensor states to Hook #40896

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions airflow/providers/amazon/aws/hooks/kinesis_analytics.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,34 @@ class KinesisAnalyticsV2Hook(AwsBaseHook):
- :class:`airflow.providers.amazon.aws.hooks.base_aws.AwsBaseHook`
"""

APPLICATION_START_INTERMEDIATE_STATES: tuple[str, ...] = ("STARTING", "UPDATING", "AUTOSCALING")
APPLICATION_START_FAILURE_STATES: tuple[str, ...] = (
"DELETING",
"STOPPING",
"READY",
"FORCE_STOPPING",
"ROLLING_BACK",
"MAINTENANCE",
"ROLLED_BACK",
)
APPLICATION_START_SUCCESS_STATES: tuple[str, ...] = ("RUNNING",)

APPLICATION_STOP_INTERMEDIATE_STATES: tuple[str, ...] = (
"STARTING",
"UPDATING",
"AUTOSCALING",
"RUNNING",
"STOPPING",
"FORCE_STOPPING",
)
APPLICATION_STOP_FAILURE_STATES: tuple[str, ...] = (
"DELETING",
"ROLLING_BACK",
"MAINTENANCE",
"ROLLED_BACK",
)
APPLICATION_STOP_SUCCESS_STATES: tuple[str, ...] = ("READY",)

def __init__(self, *args, **kwargs) -> None:
kwargs["client_type"] = "kinesisanalyticsv2"
super().__init__(*args, **kwargs)
162 changes: 77 additions & 85 deletions airflow/providers/amazon/aws/sensors/kinesis_analytics.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,72 @@
from airflow.utils.context import Context


class KinesisAnalyticsV2StartApplicationCompletedSensor(AwsBaseSensor[KinesisAnalyticsV2Hook]):
class KinesisAnalyticsV2BaseSensor(AwsBaseSensor[KinesisAnalyticsV2Hook]):
"""
General sensor behaviour for AWS Managed Service for Apache Flink.

Subclasses must set the following fields:
- ``INTERMEDIATE_STATES``
- ``FAILURE_STATES``
- ``SUCCESS_STATES``
- ``FAILURE_MESSAGE``
- ``SUCCESS_MESSAGE``

:param application_name: Application name.
:param deferrable: If True, the sensor will operate in deferrable mode. This mode requires aiobotocore
module to be installed.
(default: False, but can be overridden in config file by setting default_deferrable to True)

"""

aws_hook_class = KinesisAnalyticsV2Hook
ui_color = "#66c3ff"

INTERMEDIATE_STATES: tuple[str, ...] = ()
FAILURE_STATES: tuple[str, ...] = ()
SUCCESS_STATES: tuple[str, ...] = ()
FAILURE_MESSAGE = ""
SUCCESS_MESSAGE = ""

def __init__(
self,
application_name: str,
deferrable: bool = conf.getboolean("operators", "default_deferrable", fallback=False),
**kwargs: Any,
):
super().__init__(**kwargs)
self.application_name = application_name
self.deferrable = deferrable

def poke(self, context: Context, **kwargs) -> bool:
status = self.hook.conn.describe_application(ApplicationName=self.application_name)[
"ApplicationDetail"
]["ApplicationStatus"]

self.log.info(
"Poking for AWS Managed Service for Apache Flink application: %s status: %s",
self.application_name,
status,
)

if status in self.FAILURE_STATES:
# TODO: remove this if block when min_airflow_version is set to higher than 2.7.1
if self.soft_fail:
raise AirflowSkipException(self.FAILURE_MESSAGE)
raise AirflowException(self.FAILURE_MESSAGE)

if status in self.SUCCESS_STATES:
self.log.info(
"%s `%s`.",
self.SUCCESS_MESSAGE,
self.application_name,
)
return True

return False


class KinesisAnalyticsV2StartApplicationCompletedSensor(KinesisAnalyticsV2BaseSensor):
"""
Waits for AWS Managed Service for Apache Flink application to start.

Expand Down Expand Up @@ -59,21 +124,12 @@ class KinesisAnalyticsV2StartApplicationCompletedSensor(AwsBaseSensor[KinesisAna

"""

aws_hook_class = KinesisAnalyticsV2Hook
ui_color = "#66c3ff"
INTERMEDIATE_STATES: tuple[str, ...] = KinesisAnalyticsV2Hook.APPLICATION_START_INTERMEDIATE_STATES
FAILURE_STATES: tuple[str, ...] = KinesisAnalyticsV2Hook.APPLICATION_START_FAILURE_STATES
SUCCESS_STATES: tuple[str, ...] = KinesisAnalyticsV2Hook.APPLICATION_START_SUCCESS_STATES

INTERMEDIATE_STATES: tuple[str, ...] = ("STARTING", "UPDATING", "AUTOSCALING")
FAILURE_STATES: tuple[str, ...] = (
"DELETING",
"STOPPING",
"READY",
"FORCE_STOPPING",
"ROLLING_BACK",
"MAINTENANCE",
"ROLLED_BACK",
)
SUCCESS_STATES: tuple[str, ...] = ("RUNNING",)
FAILURE_MESSAGE = "AWS Managed Service for Apache Flink application start failed."
SUCCESS_MESSAGE = "AWS Managed Service for Apache Flink application started successfully"

template_fields: Sequence[str] = aws_template_fields("application_name")

Expand All @@ -83,14 +139,12 @@ def __init__(
application_name: str,
max_retries: int = 75,
poke_interval: int = 120,
deferrable: bool = conf.getboolean("operators", "default_deferrable", fallback=False),
**kwargs: Any,
) -> None:
super().__init__(**kwargs)
super().__init__(application_name=application_name, **kwargs)
self.application_name = application_name
self.max_retries = max_retries
self.poke_interval = poke_interval
self.deferrable = deferrable

def execute(self, context: Context) -> Any:
if self.deferrable:
Expand All @@ -110,34 +164,8 @@ def execute(self, context: Context) -> Any:
else:
super().execute(context=context)

def poke(self, context: Context, **kwargs) -> bool:
status = self.hook.conn.describe_application(ApplicationName=self.application_name)[
"ApplicationDetail"
]["ApplicationStatus"]

self.log.info(
"Poking for AWS Managed Service for Apache Flink application: %s status: %s",
self.application_name,
status,
)

if status in self.FAILURE_STATES:
# TODO: remove this if block when min_airflow_version is set to higher than 2.7.1
if self.soft_fail:
raise AirflowSkipException(self.FAILURE_MESSAGE)
raise AirflowException(self.FAILURE_MESSAGE)

if status in self.SUCCESS_STATES:
self.log.info(
"AWS Managed Service for Apache Flink application started successfully `%s`.",
self.application_name,
)
return True

return False


class KinesisAnalyticsV2StopApplicationCompletedSensor(AwsBaseSensor[KinesisAnalyticsV2Hook]):
class KinesisAnalyticsV2StopApplicationCompletedSensor(KinesisAnalyticsV2BaseSensor):
"""
Waits for AWS Managed Service for Apache Flink application to stop.

Expand Down Expand Up @@ -165,20 +193,12 @@ class KinesisAnalyticsV2StopApplicationCompletedSensor(AwsBaseSensor[KinesisAnal

"""

aws_hook_class = KinesisAnalyticsV2Hook
ui_color = "#66c3ff"
INTERMEDIATE_STATES: tuple[str, ...] = KinesisAnalyticsV2Hook.APPLICATION_STOP_INTERMEDIATE_STATES
FAILURE_STATES: tuple[str, ...] = KinesisAnalyticsV2Hook.APPLICATION_STOP_FAILURE_STATES
SUCCESS_STATES: tuple[str, ...] = KinesisAnalyticsV2Hook.APPLICATION_STOP_SUCCESS_STATES

INTERMEDIATE_STATES: tuple[str, ...] = (
"STARTING",
"UPDATING",
"AUTOSCALING",
"RUNNING",
"STOPPING",
"FORCE_STOPPING",
)
FAILURE_STATES: tuple[str, ...] = ("DELETING", "ROLLING_BACK", "MAINTENANCE", "ROLLED_BACK")
SUCCESS_STATES: tuple[str, ...] = ("READY",)
FAILURE_MESSAGE = "AWS Managed Service for Apache Flink application stop failed."
SUCCESS_MESSAGE = "AWS Managed Service for Apache Flink application stopped successfully"

template_fields: Sequence[str] = aws_template_fields("application_name")

Expand All @@ -188,14 +208,12 @@ def __init__(
application_name: str,
max_retries: int = 75,
poke_interval: int = 120,
deferrable: bool = conf.getboolean("operators", "default_deferrable", fallback=False),
**kwargs: Any,
) -> None:
super().__init__(**kwargs)
super().__init__(application_name=application_name, **kwargs)
self.application_name = application_name
self.max_retries = max_retries
self.poke_interval = poke_interval
self.deferrable = deferrable

def execute(self, context: Context) -> Any:
if self.deferrable:
Expand All @@ -214,29 +232,3 @@ def execute(self, context: Context) -> Any:
)
else:
super().execute(context=context)

def poke(self, context: Context, **kwargs) -> bool:
status = self.hook.conn.describe_application(ApplicationName=self.application_name)[
"ApplicationDetail"
]["ApplicationStatus"]

self.log.info(
"Poking for AWS Managed Service for Apache Flink application: %s status: %s",
self.application_name,
status,
)

if status in self.FAILURE_STATES:
# TODO: remove this if block when min_airflow_version is set to higher than 2.7.1
if self.soft_fail:
raise AirflowSkipException(self.FAILURE_MESSAGE)
raise AirflowException(self.FAILURE_MESSAGE)

if status in self.SUCCESS_STATES:
self.log.info(
"AWS Managed Service for Apache Flink application stopped successfully `%s`.",
self.application_name,
)
return True

return False
1 change: 1 addition & 0 deletions tests/always/test_project_structure.py
Original file line number Diff line number Diff line change
Expand Up @@ -533,6 +533,7 @@ class TestAmazonProviderProjectStructure(ExampleCoverageTest):
"airflow.providers.amazon.aws.transfers.base.AwsToAwsBaseOperator",
"airflow.providers.amazon.aws.operators.comprehend.ComprehendBaseOperator",
"airflow.providers.amazon.aws.sensors.comprehend.ComprehendBaseSensor",
"airflow.providers.amazon.aws.sensors.kinesis_analytics.KinesisAnalyticsV2BaseSensor",
}

MISSING_EXAMPLES_FOR_CLASSES = {
Expand Down