Skip to content

Conversation

@kaxil
Copy link
Member

@kaxil kaxil commented Jun 12, 2025

The dag.test method currently (in 3.0.2) clears dag runs before running ( code ) the dag. For that it uses SchedulerDAG.clear_dags, which create SchedulerDAG from Task SDK DAG -- to do that it uses deepcopy.copy.

Now that works for most cases, but not where jinja2.Template class is passed as a Task/Operator argument because of this issue.

In [1]: from copy import deepcopy
   ...:
   ...: from jinja2 import Template
   ...:
   ...: deepcopy(Template(''))
---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
Cell In[1], line 5
      1 from copy import deepcopy
      3 from jinja2 import Template
----> 5 deepcopy(Template(''))

File ~/.local/share/uv/python/cpython-3.12.8-macos-aarch64-none/lib/python3.12/copy.py:162, in deepcopy(x, memo, _nil)
    160                 y = x
    161             else:
--> 162                 y = _reconstruct(x, memo, *rv)
    164 # If is its own copy, don't memoize.
    165 if y is not x:

File ~/.local/share/uv/python/cpython-3.12.8-macos-aarch64-none/lib/python3.12/copy.py:253, in _reconstruct(x, memo, func, args, state, listiter, dictiter, deepcopy)
    251 if deep and args:
    252     args = (deepcopy(arg, memo) for arg in args)
--> 253 y = func(*args)
    254 if deep:
    255     memo[id(x)] = y

File ~/.local/share/uv/python/cpython-3.12.8-macos-aarch64-none/lib/python3.12/copyreg.py:99, in __newobj__(cls, *args)
     98 def __newobj__(cls, *args):
---> 99     return cls.__new__(cls, *args)

TypeError: Template.__new__() missing 1 required positional argument: 'source'

This is a general issue that can affect any DAG using custom arguments that store or cache Jinja2 Template objects or similar. Clearly, the longer term solution for dag.test is to not use SchedulerDAG like that and instead call some task Execution endpoint to clear dagruns -- but this works for 3.0.x.


^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named {pr_number}.significant.rst or {issue_number}.significant.rst, in airflow-core/newsfragments.

The `dag.test` method currently (in 3.0.2) clears dag runs before running ( [code](https://github.com/apache/airflow/blob/3.0.2/task-sdk/src/airflow/sdk/definitions/dag.py#L1092-L1097) ) the dag. For that it uses `SchedulerDAG.clear_dags`, which create `SchedulerDAG` from Task SDK DAG -- to do that it uses [`deepcopy.copy`](https://github.com/apache/airflow/blob/3.0.2/airflow-core/src/airflow/models/dag.py#L1797).

Now that works for most cases, but not where `jinja2.Template` class is passed as a Task/Operator argument because of [this issue](pallets/jinja#758).

```python
In [1]: from copy import deepcopy
   ...:
   ...: from jinja2 import Template
   ...:
   ...: deepcopy(Template(''))
---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
Cell In[1], line 5
      1 from copy import deepcopy
      3 from jinja2 import Template
----> 5 deepcopy(Template(''))

File ~/.local/share/uv/python/cpython-3.12.8-macos-aarch64-none/lib/python3.12/copy.py:162, in deepcopy(x, memo, _nil)
    160                 y = x
    161             else:
--> 162                 y = _reconstruct(x, memo, *rv)
    164 # If is its own copy, don't memoize.
    165 if y is not x:

File ~/.local/share/uv/python/cpython-3.12.8-macos-aarch64-none/lib/python3.12/copy.py:253, in _reconstruct(x, memo, func, args, state, listiter, dictiter, deepcopy)
    251 if deep and args:
    252     args = (deepcopy(arg, memo) for arg in args)
--> 253 y = func(*args)
    254 if deep:
    255     memo[id(x)] = y

File ~/.local/share/uv/python/cpython-3.12.8-macos-aarch64-none/lib/python3.12/copyreg.py:99, in __newobj__(cls, *args)
     98 def __newobj__(cls, *args):
---> 99     return cls.__new__(cls, *args)

TypeError: Template.__new__() missing 1 required positional argument: 'source'
```

This is a general issue that can affect any DAG using custom arguments that store or cache Jinja2 Template objects or similar.
Clearly, the longer term solution for dag.test is to not use SchedulerDAG like that and instead call some task Execution endpoint to clear dagruns -- but this works for 3.0.x.
@kaxil kaxil requested review from XD-DENG and ashb as code owners June 12, 2025 19:49
@kaxil kaxil requested a review from jedcunningham June 12, 2025 19:49
@kaxil kaxil merged commit 75f8aa9 into apache:main Jun 12, 2025
52 checks passed
@kaxil kaxil deleted the change-to-copy branch June 12, 2025 20:33
github-actions bot pushed a commit that referenced this pull request Jun 12, 2025
…1670)

The `dag.test` method currently (in 3.0.2) clears dag runs before running ( [code](https://github.com/apache/airflow/blob/3.0.2/task-sdk/src/airflow/sdk/definitions/dag.py#L1092-L1097) ) the dag. For that it uses `SchedulerDAG.clear_dags`, which create `SchedulerDAG` from Task SDK DAG -- to do that it uses [`deepcopy.copy`](https://github.com/apache/airflow/blob/3.0.2/airflow-core/src/airflow/models/dag.py#L1797).

Now that works for most cases, but not where `jinja2.Template` class is passed as a Task/Operator argument because of [this issue](pallets/jinja#758).

```python
In [1]: from copy import deepcopy
   ...:
   ...: from jinja2 import Template
   ...:
   ...: deepcopy(Template(''))
---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
Cell In[1], line 5
      1 from copy import deepcopy
      3 from jinja2 import Template
----> 5 deepcopy(Template(''))

File ~/.local/share/uv/python/cpython-3.12.8-macos-aarch64-none/lib/python3.12/copy.py:162, in deepcopy(x, memo, _nil)
    160                 y = x
    161             else:
--> 162                 y = _reconstruct(x, memo, *rv)
    164 # If is its own copy, don't memoize.
    165 if y is not x:

File ~/.local/share/uv/python/cpython-3.12.8-macos-aarch64-none/lib/python3.12/copy.py:253, in _reconstruct(x, memo, func, args, state, listiter, dictiter, deepcopy)
    251 if deep and args:
    252     args = (deepcopy(arg, memo) for arg in args)
--> 253 y = func(*args)
    254 if deep:
    255     memo[id(x)] = y

File ~/.local/share/uv/python/cpython-3.12.8-macos-aarch64-none/lib/python3.12/copyreg.py:99, in __newobj__(cls, *args)
     98 def __newobj__(cls, *args):
---> 99     return cls.__new__(cls, *args)

TypeError: Template.__new__() missing 1 required positional argument: 'source'
```

This is a general issue that can affect any DAG using custom arguments that store or cache Jinja2 Template objects or similar.
Clearly, the longer term solution for dag.test is to not use SchedulerDAG like that and instead call some task Execution endpoint to clear dagruns -- but this works for 3.0.x.
(cherry picked from commit 75f8aa9)

Co-authored-by: Kaxil Naik <kaxilnaik@gmail.com>
@github-actions
Copy link

Backport successfully created: v3-0-test

Status Branch Result
v3-0-test PR Link

Lee-W pushed a commit that referenced this pull request Jun 13, 2025
…1670) (#51673)

The `dag.test` method currently (in 3.0.2) clears dag runs before running ( [code](https://github.com/apache/airflow/blob/3.0.2/task-sdk/src/airflow/sdk/definitions/dag.py#L1092-L1097) ) the dag. For that it uses `SchedulerDAG.clear_dags`, which create `SchedulerDAG` from Task SDK DAG -- to do that it uses [`deepcopy.copy`](https://github.com/apache/airflow/blob/3.0.2/airflow-core/src/airflow/models/dag.py#L1797).

Now that works for most cases, but not where `jinja2.Template` class is passed as a Task/Operator argument because of [this issue](pallets/jinja#758).

```python
In [1]: from copy import deepcopy
   ...:
   ...: from jinja2 import Template
   ...:
   ...: deepcopy(Template(''))
---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
Cell In[1], line 5
      1 from copy import deepcopy
      3 from jinja2 import Template
----> 5 deepcopy(Template(''))

File ~/.local/share/uv/python/cpython-3.12.8-macos-aarch64-none/lib/python3.12/copy.py:162, in deepcopy(x, memo, _nil)
    160                 y = x
    161             else:
--> 162                 y = _reconstruct(x, memo, *rv)
    164 # If is its own copy, don't memoize.
    165 if y is not x:

File ~/.local/share/uv/python/cpython-3.12.8-macos-aarch64-none/lib/python3.12/copy.py:253, in _reconstruct(x, memo, func, args, state, listiter, dictiter, deepcopy)
    251 if deep and args:
    252     args = (deepcopy(arg, memo) for arg in args)
--> 253 y = func(*args)
    254 if deep:
    255     memo[id(x)] = y

File ~/.local/share/uv/python/cpython-3.12.8-macos-aarch64-none/lib/python3.12/copyreg.py:99, in __newobj__(cls, *args)
     98 def __newobj__(cls, *args):
---> 99     return cls.__new__(cls, *args)

TypeError: Template.__new__() missing 1 required positional argument: 'source'
```

This is a general issue that can affect any DAG using custom arguments that store or cache Jinja2 Template objects or similar.
Clearly, the longer term solution for dag.test is to not use SchedulerDAG like that and instead call some task Execution endpoint to clear dagruns -- but this works for 3.0.x.
(cherry picked from commit 75f8aa9)

Co-authored-by: Kaxil Naik <kaxilnaik@gmail.com>
choo121600 pushed a commit to choo121600/airflow that referenced this pull request Jun 14, 2025
The `dag.test` method currently (in 3.0.2) clears dag runs before running ( [code](https://github.com/apache/airflow/blob/3.0.2/task-sdk/src/airflow/sdk/definitions/dag.py#L1092-L1097) ) the dag. For that it uses `SchedulerDAG.clear_dags`, which create `SchedulerDAG` from Task SDK DAG -- to do that it uses [`deepcopy.copy`](https://github.com/apache/airflow/blob/3.0.2/airflow-core/src/airflow/models/dag.py#L1797).

Now that works for most cases, but not where `jinja2.Template` class is passed as a Task/Operator argument because of [this issue](pallets/jinja#758).

```python
In [1]: from copy import deepcopy
   ...:
   ...: from jinja2 import Template
   ...:
   ...: deepcopy(Template(''))
---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
Cell In[1], line 5
      1 from copy import deepcopy
      3 from jinja2 import Template
----> 5 deepcopy(Template(''))

File ~/.local/share/uv/python/cpython-3.12.8-macos-aarch64-none/lib/python3.12/copy.py:162, in deepcopy(x, memo, _nil)
    160                 y = x
    161             else:
--> 162                 y = _reconstruct(x, memo, *rv)
    164 # If is its own copy, don't memoize.
    165 if y is not x:

File ~/.local/share/uv/python/cpython-3.12.8-macos-aarch64-none/lib/python3.12/copy.py:253, in _reconstruct(x, memo, func, args, state, listiter, dictiter, deepcopy)
    251 if deep and args:
    252     args = (deepcopy(arg, memo) for arg in args)
--> 253 y = func(*args)
    254 if deep:
    255     memo[id(x)] = y

File ~/.local/share/uv/python/cpython-3.12.8-macos-aarch64-none/lib/python3.12/copyreg.py:99, in __newobj__(cls, *args)
     98 def __newobj__(cls, *args):
---> 99     return cls.__new__(cls, *args)

TypeError: Template.__new__() missing 1 required positional argument: 'source'
```

This is a general issue that can affect any DAG using custom arguments that store or cache Jinja2 Template objects or similar.
Clearly, the longer term solution for dag.test is to not use SchedulerDAG like that and instead call some task Execution endpoint to clear dagruns -- but this works for 3.0.x.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants