Skip to content

Conversation

@amoghrajesh
Copy link
Contributor

@amoghrajesh amoghrajesh commented Jun 18, 2025

closes: #51672

Moving the BaseHook class into task SDK, exactly where it should live similar to other base classes in sdk like notifiers, operators etc.

Testing: Running by all the methods defined on BaseHook -- old vs new

Running with the new path

DAG:

from __future__ import annotations

from contextlib import suppress

from airflow.models.baseoperator import BaseOperator
from airflow import DAG
from airflow.sdk import BaseHook


class CustomOperator(BaseOperator):
    def execute(self, context):
        conn = BaseHook.get_connection("athena_default")
        print("got connection from basehook", conn)


        hook = BaseHook.get_hook("athena_default")
        print("got hook from basehook", hook)

        with suppress(NotImplementedError):
            BaseHook.get_conn(hook)
            print("Raising a NotImplementedError, trying to access get_conn")

        logger = BaseHook.logger()
        print("default logger is", logger)


with DAG("get_connection_basehook", schedule=None, catchup=False) as dag:
    CustomOperator(task_id="set_var")

image

Running with the older path to check backcompat

from __future__ import annotations

from contextlib import suppress

from airflow.models.baseoperator import BaseOperator
from airflow import DAG
from airflow.hooks.base import BaseHook


class CustomOperator(BaseOperator):
    def execute(self, context):
        conn = BaseHook.get_connection("athena_default")
        print("got connection from basehook", conn)


        hook = BaseHook.get_hook("athena_default")
        print("got hook from basehook", hook)

        with suppress(NotImplementedError):
            BaseHook.get_conn(hook)
            print("Raising a NotImplementedError, trying to access get_conn")

        logger = BaseHook.logger()
        print("default logger is", logger)


with DAG("get_connection_basehook", schedule=None, catchup=False) as dag:
    CustomOperator(task_id="set_var")

image


^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named {pr_number}.significant.rst or {issue_number}.significant.rst, in airflow-core/newsfragments.

@amoghrajesh
Copy link
Contributor Author

Green again finally!!

@amoghrajesh amoghrajesh requested a review from kaxil July 1, 2025 12:22
Copy link
Member

@kaxil kaxil left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's create a GIthub issue to resolve Connection thing to have real sdk.Connection and for #51873 (comment) and #51873 (comment)

@amoghrajesh
Copy link
Contributor Author

Thanks for the review @kaxil!

Here are the issues which I will work on as follow ups:

  1. Replace API server’s direct Connection access workaround before 3.1
  2. Move all BaseHook usages in providers to version_compat (will expand this further)
  3. Investigate better mypy typing pattern for BaseHook

@amoghrajesh amoghrajesh merged commit ea5d5c3 into apache:main Jul 2, 2025
101 checks passed
@amoghrajesh amoghrajesh deleted the move-basehook-to-task-sdk branch July 2, 2025 04:49
@ferruzzi
Copy link
Contributor

ferruzzi commented Jul 3, 2025

All of he AWS system tests have been failing since this commit merged. I reverted this one and #52766 locally to make sure and they pass with this reverted. I'm looking into it, but if any cause comes to mind, let me know.

     
    ╭─────────────────────────────── Traceback (most recent call last) ────────────────────────────────╮
    │ /opt/airflow/task-sdk/src/airflow/sdk/execution_time/task_runner.py:875 in run                   │
    │                                                                                                  │
    │    872 │   │   │   │   return state, msg, error                                                  │
    │    873 │   │   │                                                                                 │
    │    874 │   │   │   try:                                                                          │
    │ ❱  875 │   │   │   │   result = _execute_task(context=context, ti=ti, log=log)                   │
    │    876 │   │   │   except Exception:                                                             │
    │    877 │   │   │   │   import jinja2                                                             │
    │    878                                                                                           │
    │                                                                                                  │
    │ ╭─────────────────────────────────────────── locals ───────────────────────────────────────────╮ │
    │ │        context = {                                                                           │ │
    │ │                  │   'dag': <DAG dag_id='example_athena' schedule='@once' #tasks=13>,        │ │
    │ │                  │   'inlets': [],                                                           │ │
    │ │                  │   'map_index_template': None,                                             │ │
    │ │                  │   'outlets': [],                                                          │ │
    │ │                  │   'run_id': 'manual__2025-07-03T18:33:34.723934+00:00',                   │ │
    │ │                  │   'task': <Task(S3CreateBucketOperator): create_s3_bucket>,               │ │
    │ │                  │   'task_instance': <RuntimeTaskInstance                                   │ │
    │ │                  │   │   id=UUID('0197d190-ca87-71e9-8e14-68b49f9d130d')                     │ │
    │ │                  │   │   task_id='create_s3_bucket'                                          │ │
    │ │                  │   │   dag_id='example_athena'                                             │ │
    │ │                  │   │   run_id='manual__2025-07-03T18:33:34.723934+00:00'                   │ │
    │ │                  │   │   max_tries=0                                                         │ │
    │ │                  │   │   task=<class                                                         │ │
    │ │                  'airflow.providers.amazon.aws.operators.s3.S3CreateBucketOperator'>         │ │
    │ │                  │   │   start_date=datetime.datetime(2025, 7, 3, 18, 33, 38, 546084,        │ │
    │ │                  tzinfo=datetime.timezone.utc)                                               │ │
    │ │                  │   >,                                                                      │ │
    │ │                  │   'ti': <RuntimeTaskInstance                                              │ │
    │ │                  │   │   id=UUID('0197d190-ca87-71e9-8e14-68b49f9d130d')                     │ │
    │ │                  │   │   task_id='create_s3_bucket'                                          │ │
    │ │                  │   │   dag_id='example_athena'                                             │ │
    │ │                  │   │   run_id='manual__2025-07-03T18:33:34.723934+00:00'                   │ │
    │ │                  │   │   max_tries=0                                                         │ │
    │ │                  │   │   task=<class                                                         │ │
    │ │                  'airflow.providers.amazon.aws.operators.s3.S3CreateBucketOperator'>         │ │
    │ │                  │   │   start_date=datetime.datetime(2025, 7, 3, 18, 33, 38, 546084,        │ │
    │ │                  tzinfo=datetime.timezone.utc)                                               │ │
    │ │                  │   >,                                                                      │ │
    │ │                  │   'outlet_events':                                                        │ │
    │ │                  <airflow.sdk.execution_time.context.OutletEventAccessors object at          │ │
    │ │                  0x7416447b5360>,                                                            │ │
    │ │                  │   'inlet_events': InletEventsAccessors(                                   │ │
    │ │                  │   │   _inlets=[],                                                         │ │
    │ │                  │   │   _assets={},                                                         │ │
    │ │                  │   │   _asset_aliases={}                                                   │ │
    │ │                  │   ),                                                                      │ │
    │ │                  │   ... +20                                                                 │ │
    │ │                  }                                                                           │ │
    │ │              e = AirflowRuntimeError("CONNECTION_NOT_FOUND: {'conn_id': 'aws_default'}")     │ │
    │ │     early_exit = None                                                                        │ │
    │ │          error = None                                                                        │ │
    │ │         jinja2 = <module 'jinja2' from                                                       │ │
    │ │                  '/usr/local/lib/python3.10/site-packages/jinja2/__init__.py'>               │ │
    │ │ keys_to_delete = []                                                                          │ │
    │ │            log = <BoundLoggerLazyProxy(logger=None, wrapper_class=None, processors=None,     │ │
    │ │                  context_class=None, initial_values={'logger_name': 'task'},                 │ │
    │ │                  logger_factory_args=())>                                                    │ │
    │ │            msg = None                                                                        │ │
    │ │             ti = <RuntimeTaskInstance                                                        │ │
    │ │                  │   id=UUID('0197d190-ca87-71e9-8e14-68b49f9d130d')                         │ │
    │ │                  │   task_id='create_s3_bucket'                                              │ │
    │ │                  │   dag_id='example_athena'                                                 │ │
    │ │                  │   run_id='manual__2025-07-03T18:33:34.723934+00:00'                       │ │
    │ │                  │   max_tries=0                                                             │ │
    │ │                  │   task=<class                                                             │ │
    │ │                  'airflow.providers.amazon.aws.operators.s3.S3CreateBucketOperator'>         │ │
    │ │                  │   start_date=datetime.datetime(2025, 7, 3, 18, 33, 38, 546084,            │ │
    │ │                  tzinfo=datetime.timezone.utc)                                               │ │
    │ │                  >                                                                           │ │
    │ ╰──────────────────────────────────────────────────────────────────────────────────────────────╯ │
    │                                                                                                  │
    │ /opt/airflow/task-sdk/src/airflow/sdk/execution_time/task_runner.py:1162 in _execute_task        │
    │                                                                                                  │
    │   1159 │   │   │   task.on_kill()                                                                │
    │   1160 │   │   │   raise                                                                         │
    │   1161 │   else:                                                                                 │
    │ ❱ 1162 │   │   result = ctx.run(execute, context=context)                                        │
    │   1163 │                                                                                         │
    │   1164 │   if (post_execute_hook := task._post_execute_hook) is not None:                        │
    │   1165 │   │   create_executable_runner(post_execute_hook, outlet_events, logger=log).run(conte  │
    │                                                                                                  │
    │ ╭─────────────────────────────────────────── locals ───────────────────────────────────────────╮ │
    │ │ airflow_context_vars = {                                                                     │ │
    │ │                        │   'AIRFLOW_CTX_DAG_OWNER': 'airflow',                               │ │
    │ │                        │   'AIRFLOW_CTX_DAG_ID': 'example_athena',                           │ │
    │ │                        │   'AIRFLOW_CTX_TASK_ID': 'create_s3_bucket',                        │ │
    │ │                        │   'AIRFLOW_CTX_LOGICAL_DATE': '2025-07-03T18:33:34.499936+00:00',   │ │
    │ │                        │   'AIRFLOW_CTX_TRY_NUMBER': '1',                                    │ │
    │ │                        │   'AIRFLOW_CTX_DAG_RUN_ID':                                         │ │
    │ │                        'manual__2025-07-03T18:33:34.723934+00:00'                            │ │
    │ │                        }                                                                     │ │
    │ │              context = {                                                                     │ │
    │ │                        │   'dag': <DAG dag_id='example_athena' schedule='@once' #tasks=13>,  │ │
    │ │                        │   'inlets': [],                                                     │ │
    │ │                        │   'map_index_template': None,                                       │ │
    │ │                        │   'outlets': [],                                                    │ │
    │ │                        │   'run_id': 'manual__2025-07-03T18:33:34.723934+00:00',             │ │
    │ │                        │   'task': <Task(S3CreateBucketOperator): create_s3_bucket>,         │ │
    │ │                        │   'task_instance': <RuntimeTaskInstance                             │ │
    │ │                        │   │   id=UUID('0197d190-ca87-71e9-8e14-68b49f9d130d')               │ │
    │ │                        │   │   task_id='create_s3_bucket'                                    │ │
    │ │                        │   │   dag_id='example_athena'                                       │ │
    │ │                        │   │   run_id='manual__2025-07-03T18:33:34.723934+00:00'             │ │
    │ │                        │   │   max_tries=0                                                   │ │
    │ │                        │   │   task=<class                                                   │ │
    │ │                        'airflow.providers.amazon.aws.operators.s3.S3CreateBucketOperator'>   │ │
    │ │                        │   │   start_date=datetime.datetime(2025, 7, 3, 18, 33, 38, 546084,  │ │
    │ │                        tzinfo=datetime.timezone.utc)                                         │ │
    │ │                        │   >,                                                                │ │
    │ │                        │   'ti': <RuntimeTaskInstance                                        │ │
    │ │                        │   │   id=UUID('0197d190-ca87-71e9-8e14-68b49f9d130d')               │ │
    │ │                        │   │   task_id='create_s3_bucket'                                    │ │
    │ │                        │   │   dag_id='example_athena'                                       │ │
    │ │                        │   │   run_id='manual__2025-07-03T18:33:34.723934+00:00'             │ │
    │ │                        │   │   max_tries=0                                                   │ │
    │ │                        │   │   task=<class                                                   │ │
    │ │                        'airflow.providers.amazon.aws.operators.s3.S3CreateBucketOperator'>   │ │
    │ │                        │   │   start_date=datetime.datetime(2025, 7, 3, 18, 33, 38, 546084,  │ │
    │ │                        tzinfo=datetime.timezone.utc)                                         │ │
    │ │                        │   >,                                                                │ │
    │ │                        │   'outlet_events':                                                  │ │
    │ │                        <airflow.sdk.execution_time.context.OutletEventAccessors object at    │ │
    │ │                        0x7416447b5360>,                                                      │ │
    │ │                        │   'inlet_events': InletEventsAccessors(                             │ │
    │ │                        │   │   _inlets=[],                                                   │ │
    │ │                        │   │   _assets={},                                                   │ │
    │ │                        │   │   _asset_aliases={}                                             │ │
    │ │                        │   ),                                                                │ │
    │ │                        │   ... +20                                                           │ │
    │ │                        }                                                                     │ │
    │ │                  ctx = <_contextvars.Context object at 0x7416448fda80>                       │ │
    │ │              execute = <bound method S3CreateBucketOperator.execute of                       │ │
    │ │                        <Task(S3CreateBucketOperator): create_s3_bucket>>                     │ │
    │ │                  log = <BoundLoggerLazyProxy(logger=None, wrapper_class=None,                │ │
    │ │                        processors=None, context_class=None, initial_values={'logger_name':   │ │
    │ │                        'task'}, logger_factory_args=())>                                     │ │
    │ │          next_method = None                                                                  │ │
    │ │        outlet_events = <airflow.sdk.execution_time.context.OutletEventAccessors object at    │ │
    │ │                        0x7416447b5360>                                                       │ │
    │ │     pre_execute_hook = <bound method BaseOperator.pre_execute of                             │ │
    │ │                        <Task(S3CreateBucketOperator): create_s3_bucket>>                     │ │
    │ │                 task = <Task(S3CreateBucketOperator): create_s3_bucket>                      │ │
    │ │                   ti = <RuntimeTaskInstance                                                  │ │
    │ │                        │   id=UUID('0197d190-ca87-71e9-8e14-68b49f9d130d')                   │ │
    │ │                        │   task_id='create_s3_bucket'                                        │ │
    │ │                        │   dag_id='example_athena'                                           │ │
    │ │                        │   run_id='manual__2025-07-03T18:33:34.723934+00:00'                 │ │
    │ │                        │   max_tries=0                                                       │ │
    │ │                        │   task=<class                                                       │ │
    │ │                        'airflow.providers.amazon.aws.operators.s3.S3CreateBucketOperator'>   │ │
    │ │                        │   start_date=datetime.datetime(2025, 7, 3, 18, 33, 38, 546084,      │ │
    │ │                        tzinfo=datetime.timezone.utc)                                         │ │
    │ │                        >                                                                     │ │
    │ ╰──────────────────────────────────────────────────────────────────────────────────────────────╯ │
    │                                                                                                  │
    │ /opt/airflow/task-sdk/src/airflow/sdk/bases/operator.py:397 in wrapper                           │
    │                                                                                                  │
    │    394 │   │   │   │   │   token = cls.tracker.set(self)                                         │
    │    395 │   │   │   │   │   stack.callback(cls.tracker.reset, token)                              │
    │    396 │   │   │   │                                                                             │
    │ ❱  397 │   │   │   │   return func(self, *args, **kwargs)                                        │
    │    398 │   │                                                                                     │
    │    399 │   │   return wrapper                                                                    │
    │    400                                                                                           │
    │                                                                                                  │
    │ ╭─────────────────────────────────────────── locals ───────────────────────────────────────────╮ │
    │ │         args = ()                                                                            │ │
    │ │       kwargs = {                                                                             │ │
    │ │                │   'context': {                                                              │ │
    │ │                │   │   'dag': <DAG dag_id='example_athena' schedule='@once' #tasks=13>,      │ │
    │ │                │   │   'inlets': [],                                                         │ │
    │ │                │   │   'map_index_template': None,                                           │ │
    │ │                │   │   'outlets': [],                                                        │ │
    │ │                │   │   'run_id': 'manual__2025-07-03T18:33:34.723934+00:00',                 │ │
    │ │                │   │   'task': <Task(S3CreateBucketOperator): create_s3_bucket>,             │ │
    │ │                │   │   'task_instance': <RuntimeTaskInstance                                 │ │
    │ │                │   │   │   id=UUID('0197d190-ca87-71e9-8e14-68b49f9d130d')                   │ │
    │ │                │   │   │   task_id='create_s3_bucket'                                        │ │
    │ │                │   │   │   dag_id='example_athena'                                           │ │
    │ │                │   │   │   run_id='manual__2025-07-03T18:33:34.723934+00:00'                 │ │
    │ │                │   │   │   max_tries=0                                                       │ │
    │ │                │   │   │   task=<class                                                       │ │
    │ │                'airflow.providers.amazon.aws.operators.s3.S3CreateBucketOperator'>           │ │
    │ │                │   │   │   start_date=datetime.datetime(2025, 7, 3, 18, 33, 38, 546084,      │ │
    │ │                tzinfo=datetime.timezone.utc)                                                 │ │
    │ │                │   │   >,                                                                    │ │
    │ │                │   │   'ti': <RuntimeTaskInstance                                            │ │
    │ │                │   │   │   id=UUID('0197d190-ca87-71e9-8e14-68b49f9d130d')                   │ │
    │ │                │   │   │   task_id='create_s3_bucket'                                        │ │
    │ │                │   │   │   dag_id='example_athena'                                           │ │
    │ │                │   │   │   run_id='manual__2025-07-03T18:33:34.723934+00:00'                 │ │
    │ │                │   │   │   max_tries=0                                                       │ │
    │ │                │   │   │   task=<class                                                       │ │
    │ │                'airflow.providers.amazon.aws.operators.s3.S3CreateBucketOperator'>           │ │
    │ │                │   │   │   start_date=datetime.datetime(2025, 7, 3, 18, 33, 38, 546084,      │ │
    │ │                tzinfo=datetime.timezone.utc)                                                 │ │
    │ │                │   │   >,                                                                    │ │
    │ │                │   │   'outlet_events':                                                      │ │
    │ │                <airflow.sdk.execution_time.context.OutletEventAccessors object at            │ │
    │ │                0x7416447b5360>,                                                              │ │
    │ │                │   │   'inlet_events': InletEventsAccessors(                                 │ │
    │ │                │   │   │   _inlets=[],                                                       │ │
    │ │                │   │   │   _assets={},                                                       │ │
    │ │                │   │   │   _asset_aliases={}                                                 │ │
    │ │                │   │   ),                                                                    │ │
    │ │                │   │   ... +20                                                               │ │
    │ │                │   }                                                                         │ │
    │ │                }                                                                             │ │
    │ │         self = <Task(S3CreateBucketOperator): create_s3_bucket>                              │ │
    │ │     sentinel = <Task(S3CreateBucketOperator): create_s3_bucket>                              │ │
    │ │ sentinel_key = 'S3CreateBucketOperator__sentinel'                                            │ │
    │ │        stack = <contextlib.ExitStack object at 0x741645e83550>                               │ │
    │ ╰──────────────────────────────────────────────────────────────────────────────────────────────╯ │
    │                                                                                                  │
    │ /opt/airflow/providers/amazon/src/airflow/providers/amazon/aws/operators/s3.py:79 in execute     │
    │                                                                                                  │
    │    76 │   │   self.bucket_name = bucket_name                                                     │
    │    77 │                                                                                          │
    │    78 │   def execute(self, context: Context):                                                   │
    │ ❱  79 │   │   if not self.hook.check_for_bucket(self.bucket_name):                               │
    │    80 │   │   │   self.hook.create_bucket(bucket_name=self.bucket_name, region_name=self.regio   │
    │    81 │   │   │   self.log.info("Created bucket with name: %s", self.bucket_name)                │
    │    82 │   │   else:                                                                              │
    │                                                                                                  │
    │ ╭─────────────────────────────────────────── locals ───────────────────────────────────────────╮ │
    │ │ context = {                                                                                  │ │
    │ │           │   'dag': <DAG dag_id='example_athena' schedule='@once' #tasks=13>,               │ │
    │ │           │   'inlets': [],                                                                  │ │
    │ │           │   'map_index_template': None,                                                    │ │
    │ │           │   'outlets': [],                                                                 │ │
    │ │           │   'run_id': 'manual__2025-07-03T18:33:34.723934+00:00',                          │ │
    │ │           │   'task': <Task(S3CreateBucketOperator): create_s3_bucket>,                      │ │
    │ │           │   'task_instance': <RuntimeTaskInstance                                          │ │
    │ │           │   │   id=UUID('0197d190-ca87-71e9-8e14-68b49f9d130d')                            │ │
    │ │           │   │   task_id='create_s3_bucket'                                                 │ │
    │ │           │   │   dag_id='example_athena'                                                    │ │
    │ │           │   │   run_id='manual__2025-07-03T18:33:34.723934+00:00'                          │ │
    │ │           │   │   max_tries=0                                                                │ │
    │ │           │   │   task=<class                                                                │ │
    │ │           'airflow.providers.amazon.aws.operators.s3.S3CreateBucketOperator'>                │ │
    │ │           │   │   start_date=datetime.datetime(2025, 7, 3, 18, 33, 38, 546084,               │ │
    │ │           tzinfo=datetime.timezone.utc)                                                      │ │
    │ │           │   >,                                                                             │ │
    │ │           │   'ti': <RuntimeTaskInstance                                                     │ │
    │ │           │   │   id=UUID('0197d190-ca87-71e9-8e14-68b49f9d130d')                            │ │
    │ │           │   │   task_id='create_s3_bucket'                                                 │ │
    │ │           │   │   dag_id='example_athena'                                                    │ │
    │ │           │   │   run_id='manual__2025-07-03T18:33:34.723934+00:00'                          │ │
    │ │           │   │   max_tries=0                                                                │ │
    │ │           │   │   task=<class                                                                │ │
    │ │           'airflow.providers.amazon.aws.operators.s3.S3CreateBucketOperator'>                │ │
    │ │           │   │   start_date=datetime.datetime(2025, 7, 3, 18, 33, 38, 546084,               │ │
    │ │           tzinfo=datetime.timezone.utc)                                                      │ │
    │ │           │   >,                                                                             │ │
    │ │           │   'outlet_events': <airflow.sdk.execution_time.context.OutletEventAccessors      │ │
    │ │           object at 0x7416447b5360>,                                                         │ │
    │ │           │   'inlet_events': InletEventsAccessors(                                          │ │
    │ │           │   │   _inlets=[],                                                                │ │
    │ │           │   │   _assets={},                                                                │ │
    │ │           │   │   _asset_aliases={}                                                          │ │
    │ │           │   ),                                                                             │ │
    │ │           │   ... +20                                                                        │ │
    │ │           }                                                                                  │ │
    │ │    self = <Task(S3CreateBucketOperator): create_s3_bucket>                                   │ │
    │ ╰──────────────────────────────────────────────────────────────────────────────────────────────╯ │
    │                                                                                                  │
    │ /opt/airflow/providers/amazon/src/airflow/providers/amazon/aws/hooks/s3.py:126 in wrapper        │
    │                                                                                                  │
    │    123 │   │   │   │   if "bucket_name" in self.service_config:                                  │
    │    124 │   │   │   │   │   bound_args.arguments["bucket_name"] = self.service_config["bucket_na  │
    │    125 │   │   │                                                                                 │
    │ ❱  126 │   │   │   return func(*bound_args.args, **bound_args.kwargs)                            │
    │    127 │                                                                                         │
    │    128 │   return wrapper                                                                        │
    │    129                                                                                           │
    │                                                                                                  │
    │ ╭─────────────────────────────────────────── locals ───────────────────────────────────────────╮ │
    │ │               args = (                                                                       │ │
    │ │                      │   <airflow.providers.amazon.aws.hooks.s3.S3Hook object at             │ │
    │ │                      0x74164418bee0>,                                                        │ │
    │ │                      │   'env64c0c6c2-athena-bucket'                                         │ │
    │ │                      )                                                                       │ │
    │ │         bound_args = <BoundArguments (self=<airflow.providers.amazon.aws.hooks.s3.S3Hook     │ │
    │ │                      object at 0x74164418bee0>, bucket_name='env64c0c6c2-athena-bucket')>    │ │
    │ │ function_signature = <Signature (self, bucket_name: 'str | None' = None) -> 'bool'>          │ │
    │ │             kwargs = {}                                                                      │ │
    │ ╰──────────────────────────────────────────────────────────────────────────────────────────────╯ │
    │                                                                                                  │
    │ /opt/airflow/providers/amazon/src/airflow/providers/amazon/aws/hooks/s3.py:307 in                │
    │ check_for_bucket                                                                                 │
    │                                                                                                  │
    │    304 │   │   :return: True if it exists and False if not.                                      │
    │    305 │   │   """                                                                               │
    │    306 │   │   try:                                                                              │
    │ ❱  307 │   │   │   self.get_conn().head_bucket(Bucket=bucket_name)                               │
    │    308 │   │   │   return True                                                                   │
    │    309 │   │   except ClientError as e:                                                          │
    │    310 │   │   │   # The head_bucket api is odd in that it cannot return proper                  │
    │                                                                                                  │
    │ ╭─────────────────────────────────────── locals ────────────────────────────────────────╮        │
    │ │ bucket_name = 'env64c0c6c2-athena-bucket'                                             │        │
    │ │        self = <airflow.providers.amazon.aws.hooks.s3.S3Hook object at 0x74164418bee0> │        │
    │ ╰───────────────────────────────────────────────────────────────────────────────────────╯        │
    │                                                                                                  │
    │ /opt/airflow/providers/amazon/src/airflow/providers/amazon/aws/hooks/base_aws.py:846 in get_conn │
    │                                                                                                  │
    │    843 │   │   :return: boto3.client or boto3.resource                                           │
    │    844 │   │   """                                                                               │
    │    845 │   │   # Compat shim                                                                     │
    │ ❱  846 │   │   return self.conn                                                                  │
    │    847 │                                                                                         │
    │    848 │   def get_credentials(self, region_name: str | None = None) -> ReadOnlyCredentials:     │
    │    849 │   │   """                                                                               │
    │                                                                                                  │
    │ ╭──────────────────────────────────── locals ────────────────────────────────────╮               │
    │ │ self = <airflow.providers.amazon.aws.hooks.s3.S3Hook object at 0x74164418bee0> │               │
    │ ╰────────────────────────────────────────────────────────────────────────────────╯               │
    │                                                                                                  │
    │ /usr/local/lib/python3.10/functools.py:981 in __get__                                            │
    │                                                                                                  │
    │   978 │   │   │   │   # check if another thread filled cache while we awaited lock               │
    │   979 │   │   │   │   val = cache.get(self.attrname, _NOT_FOUND)                                 │
    │   980 │   │   │   │   if val is _NOT_FOUND:                                                      │
    │ ❱ 981 │   │   │   │   │   val = self.func(instance)                                              │
    │   982 │   │   │   │   │   try:                                                                   │
    │   983 │   │   │   │   │   │   cache[self.attrname] = val                                         │
    │   984 │   │   │   │   │   except TypeError:                                                      │
    │                                                                                                  │
    │ ╭─────────────────────────────────────────── locals ───────────────────────────────────────────╮ │
    │ │    cache = {                                                                                 │ │
    │ │            │   '_requester_pays': False,                                                     │ │
    │ │            │   'transfer_config': <boto3.s3.transfer.TransferConfig object at                │ │
    │ │            0x74164418b040>,                                                                  │ │
    │ │            │   '_extra_args': {},                                                            │ │
    │ │            │   '_log_config_logger_name': 'airflow.task.hooks',                              │ │
    │ │            │   '_logger_name': None,                                                         │ │
    │ │            │   'aws_conn_id': 'aws_default',                                                 │ │
    │ │            │   'client_type': 's3',                                                          │ │
    │ │            │   'resource_type': None,                                                        │ │
    │ │            │   '_region_name': None,                                                         │ │
    │ │            │   '_config': None,                                                              │ │
    │ │            │   ... +1                                                                        │ │
    │ │            }                                                                                 │ │
    │ │ instance = <airflow.providers.amazon.aws.hooks.s3.S3Hook object at 0x74164418bee0>           │ │
    │ │     self = <functools.cached_property object at 0x741675d58760>                              │ │
    │ │      val = <object object at 0x741682b842d0>                                                 │ │
    │ ╰──────────────────────────────────────────────────────────────────────────────────────────────╯ │
    │                                                                                                  │
    │ /opt/airflow/providers/amazon/src/airflow/providers/amazon/aws/hooks/base_aws.py:781 in conn     │
    │                                                                                                  │
    │    778 │   │   :return: boto3.client or boto3.resource                                           │
    │    779 │   │   """                                                                               │
    │    780 │   │   if self.client_type:                                                              │
    │ ❱  781 │   │   │   return self.get_client_type(region_name=self.region_name)                     │
    │    782 │   │   return self.get_resource_type(region_name=self.region_name)                       │
    │    783 │                                                                                         │
    │    784 │   @property                                                                             │
    │                                                                                                  │
    │ ╭──────────────────────────────────── locals ────────────────────────────────────╮               │
    │ │ self = <airflow.providers.amazon.aws.hooks.s3.S3Hook object at 0x74164418bee0> │               │
    │ ╰────────────────────────────────────────────────────────────────────────────────╯               │
    │                                                                                                  │
    │ /opt/airflow/providers/amazon/src/airflow/providers/amazon/aws/hooks/base_aws.py:683 in          │
    │ region_name                                                                                      │
    │                                                                                                  │
    │    680 │   @property                                                                             │
    │    681 │   def region_name(self) -> str | None:                                                  │
    │    682 │   │   """AWS Region Name read-only property."""                                         │
    │ ❱  683 │   │   return self.conn_config.region_name                                               │
    │    684 │                                                                                         │
    │    685 │   @property                                                                             │
    │    686 │   def config(self) -> Config:                                                           │
    │                                                                                                  │
    │ ╭──────────────────────────────────── locals ────────────────────────────────────╮               │
    │ │ self = <airflow.providers.amazon.aws.hooks.s3.S3Hook object at 0x74164418bee0> │               │
    │ ╰────────────────────────────────────────────────────────────────────────────────╯               │
    │                                                                                                  │
    │ /usr/local/lib/python3.10/functools.py:981 in __get__                                            │
    │                                                                                                  │
    │   978 │   │   │   │   # check if another thread filled cache while we awaited lock               │
    │   979 │   │   │   │   val = cache.get(self.attrname, _NOT_FOUND)                                 │
    │   980 │   │   │   │   if val is _NOT_FOUND:                                                      │
    │ ❱ 981 │   │   │   │   │   val = self.func(instance)                                              │
    │   982 │   │   │   │   │   try:                                                                   │
    │   983 │   │   │   │   │   │   cache[self.attrname] = val                                         │
    │   984 │   │   │   │   │   except TypeError:                                                      │
    │                                                                                                  │
    │ ╭─────────────────────────────────────────── locals ───────────────────────────────────────────╮ │
    │ │    cache = {                                                                                 │ │
    │ │            │   '_requester_pays': False,                                                     │ │
    │ │            │   'transfer_config': <boto3.s3.transfer.TransferConfig object at                │ │
    │ │            0x74164418b040>,                                                                  │ │
    │ │            │   '_extra_args': {},                                                            │ │
    │ │            │   '_log_config_logger_name': 'airflow.task.hooks',                              │ │
    │ │            │   '_logger_name': None,                                                         │ │
    │ │            │   'aws_conn_id': 'aws_default',                                                 │ │
    │ │            │   'client_type': 's3',                                                          │ │
    │ │            │   'resource_type': None,                                                        │ │
    │ │            │   '_region_name': None,                                                         │ │
    │ │            │   '_config': None,                                                              │ │
    │ │            │   ... +1                                                                        │ │
    │ │            }                                                                                 │ │
    │ │ instance = <airflow.providers.amazon.aws.hooks.s3.S3Hook object at 0x74164418bee0>           │ │
    │ │     self = <functools.cached_property object at 0x741675d5b880>                              │ │
    │ │      val = <object object at 0x741682b842d0>                                                 │ │
    │ ╰──────────────────────────────────────────────────────────────────────────────────────────────╯ │
    │                                                                                                  │
    │ /opt/airflow/providers/amazon/src/airflow/providers/amazon/aws/hooks/base_aws.py:634 in          │
    │ conn_config                                                                                      │
    │                                                                                                  │
    │    631 │   │   │   │   possible_exceptions = (AirflowNotFoundException,)                         │
    │    632 │   │   │                                                                                 │
    │    633 │   │   │   try:                                                                          │
    │ ❱  634 │   │   │   │   connection = self.get_connection(self.aws_conn_id)                        │
    │    635 │   │   │   except possible_exceptions as e:                                              │
    │    636 │   │   │   │   if isinstance(                                                            │
    │    637 │   │   │   │   │   e, AirflowNotFoundException                                           │
    │                                                                                                  │
    │ ╭─────────────────────────────────────────── locals ───────────────────────────────────────────╮ │
    │ │          connection = None                                                                   │ │
    │ │ possible_exceptions = (                                                                      │ │
    │ │                       │   <class 'airflow.exceptions.AirflowNotFoundException'>,             │ │
    │ │                       │   <class 'airflow.sdk.exceptions.AirflowRuntimeError'>               │ │
    │ │                       )                                                                      │ │
    │ │                self = <airflow.providers.amazon.aws.hooks.s3.S3Hook object at                │ │
    │ │                       0x74164418bee0>                                                        │ │
    │ ╰──────────────────────────────────────────────────────────────────────────────────────────────╯ │
    │                                                                                                  │
    │ /opt/airflow/task-sdk/src/airflow/sdk/bases/hook.py:65 in get_connection                         │
    │                                                                                                  │
    │   62 │   │   if hasattr(sys.modules.get("airflow.sdk.execution_time.task_runner"), "SUPERVISO    │
    │   63 │   │   │   from airflow.sdk.definitions.connection import Connection                       │
    │   64 │   │   │                                                                                   │
    │ ❱ 65 │   │   │   conn = Connection.get(conn_id)                                                  │
    │   66 │   │   │   log.info("Connection Retrieved '%s' (via task-sdk)", conn.conn_id)              │
    │   67 │   │   │   return conn                                                                     │
    │   68 │   │   from airflow.models.connection import Connection as ConnectionModel                 │
    │                                                                                                  │
    │ ╭────────────── locals ───────────────╮                                                          │
    │ │ conn_id = 'aws_default'             │                                                          │
    │ │     sys = <module 'sys' (built-in)> │                                                          │
    │ ╰─────────────────────────────────────╯                                                          │
    │                                                                                                  │
    │ /opt/airflow/task-sdk/src/airflow/sdk/definitions/connection.py:152 in get                       │
    │                                                                                                  │
    │   149 │   def get(cls, conn_id: str) -> Any:                                                     │
    │   150 │   │   from airflow.sdk.execution_time.context import _get_connection                     │
    │   151 │   │                                                                                      │
    │ ❱ 152 │   │   return _get_connection(conn_id)                                                    │
    │   153 │                                                                                          │
    │   154 │   @property                                                                              │
    │   155 │   def extra_dejson(self) -> dict:                                                        │
    │                                                                                                  │
    │ ╭──────── locals ─────────╮                                                                      │
    │ │ conn_id = 'aws_default' │                                                                      │
    │ ╰─────────────────────────╯                                                                      │
    │                                                                                                  │
    │ /opt/airflow/task-sdk/src/airflow/sdk/execution_time/context.py:158 in _get_connection           │
    │                                                                                                  │
    │   155 │   msg = SUPERVISOR_COMMS.send(GetConnection(conn_id=conn_id))                            │
    │   156 │                                                                                          │
    │   157 │   if isinstance(msg, ErrorResponse):                                                     │
    │ ❱ 158 │   │   raise AirflowRuntimeError(msg)                                                     │
    │   159 │                                                                                          │
    │   160 │   if TYPE_CHECKING:                                                                      │
    │   161 │   │   assert isinstance(msg, ConnectionResult)                                           │
    │                                                                                                  │
    │ ╭─────────────────────────────────────────── locals ───────────────────────────────────────────╮ │
    │ │         backends = [                                                                         │ │
    │ │                    │   <airflow.secrets.environment_variables.EnvironmentVariablesBackend    │ │
    │ │                    object at 0x7416441d2bc0>                                                 │ │
    │ │                    ]                                                                         │ │
    │ │             conn = None                                                                      │ │
    │ │          conn_id = 'aws_default'                                                             │ │
    │ │              msg = ErrorResponse(                                                            │ │
    │ │                    │   error=<ErrorType.CONNECTION_NOT_FOUND: 'CONNECTION_NOT_FOUND'>,       │ │
    │ │                    │   detail={'conn_id': 'aws_default'},                                    │ │
    │ │                    │   type='ErrorResponse'                                                  │ │
    │ │                    )                                                                         │ │
    │ │  secrets_backend = <airflow.secrets.environment_variables.EnvironmentVariablesBackend object │ │
    │ │                    at 0x7416441d2bc0>                                                        │ │
    │ │ SUPERVISOR_COMMS = InProcessSupervisorComms(                                                 │ │
    │ │                    │   supervisor=<InProcessTestSupervisor                                   │ │
    │ │                    │   │   id=UUID('0197d190-ca87-71e9-8e14-68b49f9d130d')                   │ │
    │ │                    │   │   pid=251                                                           │ │
    │ │                    │   >,                                                                    │ │
    │ │                    │   messages=deque()                                                      │ │
    │ │                    )                                                                         │ │
    │ ╰──────────────────────────────────────────────────────────────────────────────────────────────╯ │
    ╰──────────────────────────────────────────────────────────────────────────────────────────────────╯
    AirflowRuntimeError: CONNECTION_NOT_FOUND: {'conn_id': 'aws_default'}
    ```

ramitkataria added a commit to aws-mwaa/upstream-to-airflow that referenced this pull request Jul 4, 2025
…OUND` Task SDK exception

After apache#51873, the base hook's `get_connection` exception for when
connection is not found was changed and was no longer being caught by
`AwsGenericHook`. This change fixes that by using a robust approach.

At some point, we should probably make the connection getter raise
exceptions in a more consistent manner, maybe once we fully switch over
to Task SDK.
@ramitkataria
Copy link
Contributor

The cause of this error was a change in the string representation of the exception after this PR was merged. In #52838, I changed it to a what seems like a more robust way of catching the "connection not found" exception. But I think at some point, we should probably make the connection getter raise exceptions in a more consistent manner (maybe once we fully switch over to Task SDK) so that callers of that method can have a simple try/except to handle the not found case

@amoghrajesh
Copy link
Contributor Author

The cause of this error was a change in the string representation of the exception after this PR was merged. In #52838, I changed it to a what seems like a more robust way of catching the "connection not found" exception. But I think at some point, we should probably make the connection getter raise exceptions in a more consistent manner (maybe once we fully switch over to Task SDK) so that callers of that method can have a simple try/except to handle the not found case

Hey @ramitkataria what string representation changed? Could you elaborate a bit there?

ferruzzi pushed a commit that referenced this pull request Jul 4, 2025
…OUND` Task SDK exception (#52838)

After #51873, the base hook's `get_connection` exception for when
connection is not found was changed and was no longer being caught by
`AwsGenericHook`. This change fixes that by using a robust approach.

At some point, we should probably make the connection getter raise
exceptions in a more consistent manner, maybe once we fully switch over
to Task SDK.
@jscheffl
Copy link
Contributor

jscheffl commented Jul 5, 2025

@amoghrajesh I have seen strange plugin initialization errors in https://apache-airflow.slack.com/archives/C06K9Q5G2UA/p1751744572678519 - is this related to this PR?

@jscheffl
Copy link
Contributor

jscheffl commented Jul 5, 2025

@amoghrajesh Also antoher thing, maybe related? Started breeze on main via breeze start-airflow --python 3.12 --backend postgres --executor EdgeExecutor --load-example-dags and noticed that my previous integration test via dag providers/edge3/src/airflow/providers/edge3/example_dags/integration_test.py fails in task connection with the error:

[2025-07-05, 21:35:08] ERROR - Task failed with exception: source="task"
AirflowRuntimeError: CONNECTION_NOT_FOUND: {'conn_id': 'integration_test'}
File "/opt/airflow/task-sdk/src/airflow/sdk/execution_time/task_runner.py", line 875 in run
File "/opt/airflow/task-sdk/src/airflow/sdk/execution_time/task_runner.py", line 1162 in _execute_task
File "/opt/airflow/task-sdk/src/airflow/sdk/bases/operator.py", line 397 in wrapper
File "/opt/airflow/task-sdk/src/airflow/sdk/bases/decorator.py", line 251 in execute
File "/opt/airflow/task-sdk/src/airflow/sdk/bases/operator.py", line 397 in wrapper
File "/opt/airflow/providers/standard/src/airflow/providers/standard/operators/python.py", line 217 in execute
File "/opt/airflow/providers/standard/src/airflow/providers/standard/operators/python.py", line 240 in execute_callable
File "/opt/airflow/task-sdk/src/airflow/sdk/execution_time/callback_runner.py", line 82 in run
File "/files/dags/integration_test.py", line 134 in connection
File "/opt/airflow/task-sdk/src/airflow/sdk/bases/hook.py", line 65 in get_connection
File "/opt/airflow/task-sdk/src/airflow/sdk/definitions/connection.py", line 152 in get
File "/opt/airflow/task-sdk/src/airflow/sdk/execution_time/context.py", line 158 in _get_connection

Is this related to this PR? Or shall I open a separate issue ticket?

@amoghrajesh
Copy link
Contributor Author

@jscheffl could be related. Can you please create issues(either for one for both)?

I can only look on Monday, wont be around tomorrow

@jscheffl
Copy link
Contributor

jscheffl commented Jul 5, 2025

breeze start-airflow --python 3.12 --backend postgres --executor EdgeExecutor --load-example-dags

#52921

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Move BaseHook to TaskSDK under airflow.sdk.bases

7 participants