From f93459de6edddfd08fe64a23d8519e51940c84cd Mon Sep 17 00:00:00 2001 From: Ramit Kataria Date: Thu, 3 Jul 2025 17:37:29 -0700 Subject: [PATCH] More robust handling of `BaseHook.get_connection`'s `CONNECTION_NOT_FOUND` Task SDK exception After #51873, the base hook's `get_connection` exception for when connection is not found was changed and was no longer being caught by `AwsGenericHook`. This change fixes that by using a robust approach. At some point, we should probably make the connection getter raise exceptions in a more consistent manner, maybe once we fully switch over to Task SDK. --- .../providers/amazon/aws/hooks/base_aws.py | 21 ++++++++----------- 1 file changed, 9 insertions(+), 12 deletions(-) diff --git a/providers/amazon/src/airflow/providers/amazon/aws/hooks/base_aws.py b/providers/amazon/src/airflow/providers/amazon/aws/hooks/base_aws.py index 18e9d396230a9..524af9722da29 100644 --- a/providers/amazon/src/airflow/providers/amazon/aws/hooks/base_aws.py +++ b/providers/amazon/src/airflow/providers/amazon/aws/hooks/base_aws.py @@ -82,7 +82,7 @@ if AIRFLOW_V_3_0_PLUS: - from airflow.sdk.exceptions import AirflowRuntimeError + from airflow.sdk.exceptions import AirflowRuntimeError, ErrorType if TYPE_CHECKING: from aiobotocore.session import AioSession @@ -623,19 +623,16 @@ def conn_config(self) -> AwsConnectionWrapper: """Get the Airflow Connection object and wrap it in helper (cached).""" connection = None if self.aws_conn_id: - possible_exceptions: tuple[type[Exception], ...] - - if AIRFLOW_V_3_0_PLUS: - possible_exceptions = (AirflowNotFoundException, AirflowRuntimeError) - else: - possible_exceptions = (AirflowNotFoundException,) - try: connection = self.get_connection(self.aws_conn_id) - except possible_exceptions as e: - if isinstance( - e, AirflowNotFoundException - ) or f"Connection with ID {self.aws_conn_id} not found" in str(e): + except Exception as e: + not_found_exc_via_core = isinstance(e, AirflowNotFoundException) + not_found_exc_via_task_sdk = ( + AIRFLOW_V_3_0_PLUS + and isinstance(e, AirflowRuntimeError) + and e.error.error == ErrorType.CONNECTION_NOT_FOUND + ) + if not_found_exc_via_core or not_found_exc_via_task_sdk: self.log.warning( "Unable to find AWS Connection ID '%s', switching to empty.", self.aws_conn_id )