Skip to content

dag.test() fails when HttpHook is initialized in Operator init method #53371

@aa-matthias

Description

@aa-matthias

Apache Airflow version

3.0.3

If "Other Airflow 2 version" selected, which one?

No response

What happened?

To avoid redundant init of httphook in multiple class methods and reuse the connection across multiple methods we are initializing the hook in the operator init method. while this works fine debugging such a task is no longer possible in Airflow (since 3.0.0).

Debugging the task via

if __name__ == "__main__":
    dag_object.test()

works fine in previous versions of Airflow (e.g. 2.10.0).

Full Stack Trace:

'TCPKeepAliveAdapter' object has no attribute 'socket_options'
  File "/venv/lib/python3.12/site-packages/requests_toolbelt/adapters/socket_options.py", line 66, in init_poolmanager
    socket_options=self.socket_options
                   ^^^^^^^^^^^^^^^^^^^
  File "/venv/lib/python3.12/site-packages/requests/adapters.py", line 236, in __setstate__
    self._pool_connections, self._pool_maxsize, block=self._pool_block

        )
  File "/usr/lib/python3.12/copy.py", line 261, in _reconstruct
    y.__setstate__(state)
  File "/usr/lib/python3.12/copy.py", line 162, in deepcopy
    y = _reconstruct(x, memo, *rv)
        ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/lib/python3.12/copy.py", line 221, in _deepcopy_dict
    y[deepcopy(key, memo)] = deepcopy(value, memo)
                             ^^^^^^^^^^^^^^^^^^^^^
  File "/usr/lib/python3.12/copy.py", line 136, in deepcopy
    y = copier(x, memo)
        ^^^^^^^^^^^^^^^
  File "/usr/lib/python3.12/copy.py", line 259, in _reconstruct
    state = deepcopy(state, memo)
            ^^^^^^^^^^^^^^^^^^^^^
  File "/usr/lib/python3.12/copy.py", line 162, in deepcopy
    y = _reconstruct(x, memo, *rv)
        ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/venv/lib/python3.12/site-packages/airflow/sdk/bases/operator.py", line 1264, in __deepcopy__
    v = copy.deepcopy(v, memo)
        ^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/lib/python3.12/copy.py", line 143, in deepcopy
    y = copier(memo)
        ^^^^^^^^^^^^
  File "/usr/lib/python3.12/copy.py", line 221, in _deepcopy_dict
    y[deepcopy(key, memo)] = deepcopy(value, memo)
                             ^^^^^^^^^^^^^^^^^^^^^
  File "/usr/lib/python3.12/copy.py", line 136, in deepcopy
    y = copier(x, memo)
        ^^^^^^^^^^^^^^^
  File "/venv/lib/python3.12/site-packages/airflow/sdk/definitions/dag.py", line 752, in __deepcopy__
    object.__setattr__(result, k, copy.deepcopy(v, memo))
                                  ^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/lib/python3.12/copy.py", line 143, in deepcopy
    y = copier(memo)
        ^^^^^^^^^^^^
  File "/usr/lib/python3.12/copy.py", line 221, in _deepcopy_dict
    y[deepcopy(key, memo)] = deepcopy(value, memo)
                             ^^^^^^^^^^^^^^^^^^^^^
  File "/usr/lib/python3.12/copy.py", line 136, in deepcopy
    y = copier(x, memo)
        ^^^^^^^^^^^^^^^
  File "/usr/lib/python3.12/copy.py", line 259, in _reconstruct
    state = deepcopy(state, memo)
            ^^^^^^^^^^^^^^^^^^^^^
  File "/usr/lib/python3.12/copy.py", line 162, in deepcopy
    y = _reconstruct(x, memo, *rv)
        ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/venv/lib/python3.12/site-packages/airflow/models/dag.py", line 1779, in create_task_groups
    new_task_group = copy.deepcopy(task_group)
                     ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/venv/lib/python3.12/site-packages/airflow/models/dag.py", line 1791, in from_sdk_dag
    create_task_groups(dag.task_group)
  File "/venv/lib/python3.12/site-packages/airflow/models/dag.py", line 1459, in clear_dags
    dag = DAG.from_sdk_dag(dag)
          ^^^^^^^^^^^^^^^^^^^^^
  File "/venv/lib/python3.12/site-packages/airflow/sdk/definitions/dag.py", line 1096, in test
    dags=[self],

                start_date=logical_date,

                end_date=logical_date,

                dag_run_state=False,  # type: ignore

            )
  File "/home/matthias/airflow/dags/fxl_to_spf_products.py", line 154, in <module>
    _dag.test()
  File "/usr/lib/python3.12/runpy.py", line 88, in _run_code
    exec(code, run_globals)
  File "/usr/lib/python3.12/runpy.py", line 198, in _run_module_as_main (Current frame)
    return _run_code(code, main_globals, None,
AttributeError: 'TCPKeepAliveAdapter' object has no attribute 'socket_options'

What you think should happen instead?

No response

How to reproduce

please see one of the two minimal repo examples attached.

minimal_repo_dag_test_issue_class.txt
minimal_repo_dag_test_issue_dag.txt

Operating System

Ubuntu 24.04

Versions of Apache Airflow Providers

No response

Deployment

Virtualenv installation

Deployment details

No response

Anything else?

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Metadata

Metadata

Assignees

Labels

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions