diff --git a/airflow-core/src/airflow/models/serialized_dag.py b/airflow-core/src/airflow/models/serialized_dag.py index a019ed6377f45..bd499a1a17953 100644 --- a/airflow-core/src/airflow/models/serialized_dag.py +++ b/airflow-core/src/airflow/models/serialized_dag.py @@ -27,9 +27,9 @@ import sqlalchemy_jsonfield import uuid6 -from sqlalchemy import ForeignKey, LargeBinary, String, select, tuple_, update +from sqlalchemy import ForeignKey, LargeBinary, String, exists, select, tuple_, update from sqlalchemy.dialects.postgresql import JSONB -from sqlalchemy.orm import Mapped, backref, foreign, joinedload, relationship +from sqlalchemy.orm import Mapped, backref, foreign, relationship from sqlalchemy.sql.expression import func, literal from sqlalchemy_utils import UUIDType @@ -45,6 +45,7 @@ from airflow.models.dagcode import DagCode from airflow.models.dagrun import DagRun from airflow.models.deadline_alert import DeadlineAlert as DeadlineAlertModel +from airflow.models.taskinstance import TaskInstance from airflow.serialization.dag_dependency import DagDependency from airflow.serialization.definitions.assets import SerializedAssetUniqueKey as UKey from airflow.serialization.definitions.deadline import DeadlineAlertFields @@ -486,7 +487,6 @@ def write_dag( dag_version = session.scalar( select(DagVersion) .where(DagVersion.dag_id == dag.dag_id) - .options(joinedload(DagVersion.task_instances)) .order_by(DagVersion.created_at.desc()) .limit(1) ) @@ -520,7 +520,13 @@ def write_dag( log.debug("Serialized DAG (%s) is unchanged. Skipping writing to DB", dag.dag_id) return False - if dag_version and not dag_version.task_instances: + has_task_instances: bool = False + if dag_version: + has_task_instances = bool( + session.scalar(select(exists().where(TaskInstance.dag_version_id == dag_version.id))) + ) + + if dag_version and not has_task_instances: # This is for dynamic DAGs that the hashes changes often. We should update # the serialized dag, the dag_version and the dag_code instead of a new version # if the dag_version is not associated with any task instances