-
Notifications
You must be signed in to change notification settings - Fork 16.3k
Description
Apache Airflow version
3.1.5
If "Other Airflow 3 version" selected, which one?
No response
What happened?
DAG version keeps incrementing without any changes to the DAG.
Operator template fields may be provided via callable of the form: def fn(context, jinja_env)
These template field callables are serialized via default str which includes current address of the callable: <function fn at 0x7592eb0cb380>
The address of the loaded callable changes between iterations of the dag-processor DAG refresh loop, and since DAG versioning depends on stable serialization, this leads to DAG version explosion.
What you think should happen instead?
DAG version should stay the same when there are no changes.
Operator template fields serialization is treated separately here and serialize_template_field does not handle callable case.
Instead template field callables should be serialized via already existing logic here.
How to reproduce
Create a DAG with a single task with template fields, configure any of the template fields to be provided via callable: def fn(context, jinja_env)
DAG version will increment without any changes.
Operating System
Ubuntu 24.04.3 LTS
Versions of Apache Airflow Providers
apache-airflow-providers-amazon==9.18.1
apache-airflow-providers-cncf-kubernetes==10.11.1
apache-airflow-providers-common-compat==1.10.1
apache-airflow-providers-common-io==1.7.0
apache-airflow-providers-common-sql==1.30.1
apache-airflow-providers-databricks==7.8.1
apache-airflow-providers-datadog==3.10.1
apache-airflow-providers-ftp==3.14.0
apache-airflow-providers-http==5.6.1
apache-airflow-providers-mysql==6.4.0
apache-airflow-providers-openlineage==2.9.1
apache-airflow-providers-pagerduty==5.2.1
apache-airflow-providers-sftp==5.5.1
apache-airflow-providers-slack==9.6.1
apache-airflow-providers-smtp==2.4.1
apache-airflow-providers-ssh==4.2.1
apache-airflow-providers-standard==1.10.1
apache-airflow-providers-trino==6.4.1
Deployment
Virtualenv installation
Deployment details
No response
Anything else?
Example DAG:
from __future__ import annotations
import pendulum
from airflow.models.dag import DAG
from airflow.operators.bash import BashOperator
from airflow.utils.context import Context
import jinja2
def get_env(context: Context, jinja_env: jinja2.Environment):
return {"SLEED_DURATION": "5"}
with DAG(
dag_id="simple_bash_dag",
start_date=pendulum.datetime(2023, 1, 1, tz="UTC"),
schedule=None,
catchup=False,
tags=["example", "bash"]
) as dag:
# Task 1: Print the current date
task_print_date = BashOperator(
task_id="print_current_date",
bash_command="echo 'Today is: ' && date",
)
# Task 2: Sleep for 5 seconds
task_sleep = BashOperator(
task_id="sleep_for_five_seconds",
bash_command="sleep $SLEED_DURATION",
env=get_env,
)
# Define task dependencies
# task_print_date must complete successfully before task_sleep starts
task_print_date >> task_sleeprun it a couple of times or schedule, a new version is run every time:
same DAG, different versions:
Are you willing to submit PR?
- Yes I am willing to submit a PR!
Code of Conduct
- I agree to follow this project's Code of Conduct