From 7b46446aa67253f7acba26179636926175083d0d Mon Sep 17 00:00:00 2001 From: vatsrahul1001 Date: Thu, 22 Jan 2026 17:48:33 +0530 Subject: [PATCH 1/3] Optimize write_dag to use EXISTS query instead of joinedload --- airflow-core/src/airflow/models/serialized_dag.py | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/airflow-core/src/airflow/models/serialized_dag.py b/airflow-core/src/airflow/models/serialized_dag.py index a019ed6377f45..bd499a1a17953 100644 --- a/airflow-core/src/airflow/models/serialized_dag.py +++ b/airflow-core/src/airflow/models/serialized_dag.py @@ -27,9 +27,9 @@ import sqlalchemy_jsonfield import uuid6 -from sqlalchemy import ForeignKey, LargeBinary, String, select, tuple_, update +from sqlalchemy import ForeignKey, LargeBinary, String, exists, select, tuple_, update from sqlalchemy.dialects.postgresql import JSONB -from sqlalchemy.orm import Mapped, backref, foreign, joinedload, relationship +from sqlalchemy.orm import Mapped, backref, foreign, relationship from sqlalchemy.sql.expression import func, literal from sqlalchemy_utils import UUIDType @@ -45,6 +45,7 @@ from airflow.models.dagcode import DagCode from airflow.models.dagrun import DagRun from airflow.models.deadline_alert import DeadlineAlert as DeadlineAlertModel +from airflow.models.taskinstance import TaskInstance from airflow.serialization.dag_dependency import DagDependency from airflow.serialization.definitions.assets import SerializedAssetUniqueKey as UKey from airflow.serialization.definitions.deadline import DeadlineAlertFields @@ -486,7 +487,6 @@ def write_dag( dag_version = session.scalar( select(DagVersion) .where(DagVersion.dag_id == dag.dag_id) - .options(joinedload(DagVersion.task_instances)) .order_by(DagVersion.created_at.desc()) .limit(1) ) @@ -520,7 +520,13 @@ def write_dag( log.debug("Serialized DAG (%s) is unchanged. Skipping writing to DB", dag.dag_id) return False - if dag_version and not dag_version.task_instances: + has_task_instances: bool = False + if dag_version: + has_task_instances = bool( + session.scalar(select(exists().where(TaskInstance.dag_version_id == dag_version.id))) + ) + + if dag_version and not has_task_instances: # This is for dynamic DAGs that the hashes changes often. We should update # the serialized dag, the dag_version and the dag_code instead of a new version # if the dag_version is not associated with any task instances From b82284d003a627dde4c278906ddf3fadf7ecbc2c Mon Sep 17 00:00:00 2001 From: Jed Cunningham <66968678+jedcunningham@users.noreply.github.com> Date: Thu, 22 Jan 2026 13:47:43 -0700 Subject: [PATCH 2/3] Update airflow-core/src/airflow/models/serialized_dag.py Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- airflow-core/src/airflow/models/serialized_dag.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/airflow-core/src/airflow/models/serialized_dag.py b/airflow-core/src/airflow/models/serialized_dag.py index bd499a1a17953..e334cb403abdb 100644 --- a/airflow-core/src/airflow/models/serialized_dag.py +++ b/airflow-core/src/airflow/models/serialized_dag.py @@ -522,8 +522,8 @@ def write_dag( has_task_instances: bool = False if dag_version: - has_task_instances = bool( - session.scalar(select(exists().where(TaskInstance.dag_version_id == dag_version.id))) + has_task_instances = session.scalar( + select(exists().where(TaskInstance.dag_version_id == dag_version.id)) ) if dag_version and not has_task_instances: From 8d27fde3770beb84d2a3be3ec9ef771bef6d4654 Mon Sep 17 00:00:00 2001 From: vatsrahul1001 Date: Fri, 23 Jan 2026 07:47:09 +0530 Subject: [PATCH 3/3] Revert "Update airflow-core/src/airflow/models/serialized_dag.py" This reverts commit b82284d003a627dde4c278906ddf3fadf7ecbc2c. --- airflow-core/src/airflow/models/serialized_dag.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/airflow-core/src/airflow/models/serialized_dag.py b/airflow-core/src/airflow/models/serialized_dag.py index e334cb403abdb..bd499a1a17953 100644 --- a/airflow-core/src/airflow/models/serialized_dag.py +++ b/airflow-core/src/airflow/models/serialized_dag.py @@ -522,8 +522,8 @@ def write_dag( has_task_instances: bool = False if dag_version: - has_task_instances = session.scalar( - select(exists().where(TaskInstance.dag_version_id == dag_version.id)) + has_task_instances = bool( + session.scalar(select(exists().where(TaskInstance.dag_version_id == dag_version.id))) ) if dag_version and not has_task_instances: