Skip to content

Commit

Permalink
Decide status_field in base class
Browse files Browse the repository at this point in the history
  • Loading branch information
hankehly committed Sep 6, 2022
1 parent 1facc0c commit afefe36
Showing 1 changed file with 4 additions and 10 deletions.
14 changes: 4 additions & 10 deletions airflow/providers/amazon/aws/sensors/rds.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,20 +61,19 @@ def _describe_item(self, item_type: str, item_name: str) -> list:

def _check_item(self, item_type: str, item_name: str) -> bool:
"""Get certain item from `_describe_item()` and check its status"""
if item_type == "db_instance":
status_field = "DBInstanceStatus"
else:
status_field = "Status"
try:
items = self._describe_item(item_type, item_name)
except ClientError:
return False
else:
status_field = self._check_status_field()
return bool(items) and any(
map(lambda status: items[0][status_field].lower() == status, self.target_statuses)
)

def _check_status_field(self) -> str:
"""Return the name of the '_describe_item' response field that corresponds to the resource status."""
return "Status"


class RdsSnapshotExistenceSensor(RdsBaseSensor):
"""
Expand Down Expand Up @@ -195,11 +194,6 @@ def poke(self, context: 'Context'):
item_type = self._check_item_type()
return self._check_item(item_type=item_type, item_name=self.db_identifier)

def _check_status_field(self) -> str:
if self.db_type == RdsDbType.INSTANCE:
return "DBInstanceStatus"
return "Status"

def _check_item_type(self):
if self.db_type == RdsDbType.CLUSTER:
return "db_cluster"
Expand Down

0 comments on commit afefe36

Please sign in to comment.