From 1d729ca316c8c24835659080eba7464a85927b0a Mon Sep 17 00:00:00 2001 From: Anshu Singh Date: Wed, 22 Oct 2025 10:31:49 +0530 Subject: [PATCH] Update SerializedDagModel query to fetch DagVersion with joinedload --- airflow-core/src/airflow/models/serialized_dag.py | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/airflow-core/src/airflow/models/serialized_dag.py b/airflow-core/src/airflow/models/serialized_dag.py index e5df49e2a952b..c584ab27d92dc 100644 --- a/airflow-core/src/airflow/models/serialized_dag.py +++ b/airflow-core/src/airflow/models/serialized_dag.py @@ -29,7 +29,7 @@ import uuid6 from sqlalchemy import ForeignKey, LargeBinary, String, select, tuple_ from sqlalchemy.dialects.postgresql import JSONB -from sqlalchemy.orm import Mapped, backref, foreign, relationship +from sqlalchemy.orm import Mapped, backref, foreign, joinedload, relationship from sqlalchemy.sql.expression import func, literal from sqlalchemy_utils import UUIDType @@ -414,7 +414,14 @@ def write_dag( serialized_dag_hash = session.scalars( select(cls.dag_hash).where(cls.dag_id == dag.dag_id).order_by(cls.created_at.desc()) ).first() - dag_version = DagVersion.get_latest_version(dag.dag_id, session=session) + dag_version = session.scalar( + select(DagVersion) + .where(DagVersion.dag_id == dag.dag_id) + .options(joinedload(DagVersion.task_instances)) + .options(joinedload(DagVersion.serialized_dag)) + .order_by(DagVersion.created_at.desc()) + .limit(1) + ) if ( serialized_dag_hash == new_serialized_dag.dag_hash @@ -429,8 +436,8 @@ def write_dag( # the serialized dag, the dag_version and the dag_code instead of a new version # if the dag_version is not associated with any task instances latest_ser_dag = cls.get(dag.dag_id, session=session) - if TYPE_CHECKING: - assert latest_ser_dag is not None + if not latest_ser_dag: + return False # Update the serialized DAG with the new_serialized_dag latest_ser_dag._data = new_serialized_dag._data latest_ser_dag._data_compressed = new_serialized_dag._data_compressed