diff --git a/providers/amazon/src/airflow/providers/amazon/aws/hooks/base_aws.py b/providers/amazon/src/airflow/providers/amazon/aws/hooks/base_aws.py index 18e9d396230a9..524af9722da29 100644 --- a/providers/amazon/src/airflow/providers/amazon/aws/hooks/base_aws.py +++ b/providers/amazon/src/airflow/providers/amazon/aws/hooks/base_aws.py @@ -82,7 +82,7 @@ if AIRFLOW_V_3_0_PLUS: - from airflow.sdk.exceptions import AirflowRuntimeError + from airflow.sdk.exceptions import AirflowRuntimeError, ErrorType if TYPE_CHECKING: from aiobotocore.session import AioSession @@ -623,19 +623,16 @@ def conn_config(self) -> AwsConnectionWrapper: """Get the Airflow Connection object and wrap it in helper (cached).""" connection = None if self.aws_conn_id: - possible_exceptions: tuple[type[Exception], ...] - - if AIRFLOW_V_3_0_PLUS: - possible_exceptions = (AirflowNotFoundException, AirflowRuntimeError) - else: - possible_exceptions = (AirflowNotFoundException,) - try: connection = self.get_connection(self.aws_conn_id) - except possible_exceptions as e: - if isinstance( - e, AirflowNotFoundException - ) or f"Connection with ID {self.aws_conn_id} not found" in str(e): + except Exception as e: + not_found_exc_via_core = isinstance(e, AirflowNotFoundException) + not_found_exc_via_task_sdk = ( + AIRFLOW_V_3_0_PLUS + and isinstance(e, AirflowRuntimeError) + and e.error.error == ErrorType.CONNECTION_NOT_FOUND + ) + if not_found_exc_via_core or not_found_exc_via_task_sdk: self.log.warning( "Unable to find AWS Connection ID '%s', switching to empty.", self.aws_conn_id )