Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 11 additions & 4 deletions airflow-core/src/airflow/models/serialized_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
import uuid6
from sqlalchemy import ForeignKey, LargeBinary, String, select, tuple_
from sqlalchemy.dialects.postgresql import JSONB
from sqlalchemy.orm import Mapped, backref, foreign, relationship
from sqlalchemy.orm import Mapped, backref, foreign, joinedload, relationship
from sqlalchemy.sql.expression import func, literal
from sqlalchemy_utils import UUIDType

Expand Down Expand Up @@ -414,7 +414,14 @@ def write_dag(
serialized_dag_hash = session.scalars(
select(cls.dag_hash).where(cls.dag_id == dag.dag_id).order_by(cls.created_at.desc())
).first()
dag_version = DagVersion.get_latest_version(dag.dag_id, session=session)
dag_version = session.scalar(
select(DagVersion)
.where(DagVersion.dag_id == dag.dag_id)
.options(joinedload(DagVersion.task_instances))
.options(joinedload(DagVersion.serialized_dag))
.order_by(DagVersion.created_at.desc())
.limit(1)
)

if (
serialized_dag_hash == new_serialized_dag.dag_hash
Expand All @@ -429,8 +436,8 @@ def write_dag(
# the serialized dag, the dag_version and the dag_code instead of a new version
# if the dag_version is not associated with any task instances
latest_ser_dag = cls.get(dag.dag_id, session=session)
if TYPE_CHECKING:
assert latest_ser_dag is not None
if not latest_ser_dag:
return False
# Update the serialized DAG with the new_serialized_dag
latest_ser_dag._data = new_serialized_dag._data
latest_ser_dag._data_compressed = new_serialized_dag._data_compressed
Expand Down