Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 10 additions & 4 deletions airflow-core/src/airflow/models/serialized_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@

import sqlalchemy_jsonfield
import uuid6
from sqlalchemy import ForeignKey, LargeBinary, String, select, tuple_, update
from sqlalchemy import ForeignKey, LargeBinary, String, exists, select, tuple_, update
from sqlalchemy.dialects.postgresql import JSONB
from sqlalchemy.orm import Mapped, backref, foreign, joinedload, relationship
from sqlalchemy.orm import Mapped, backref, foreign, relationship
from sqlalchemy.sql.expression import func, literal
from sqlalchemy_utils import UUIDType

Expand All @@ -45,6 +45,7 @@
from airflow.models.dagcode import DagCode
from airflow.models.dagrun import DagRun
from airflow.models.deadline_alert import DeadlineAlert as DeadlineAlertModel
from airflow.models.taskinstance import TaskInstance
from airflow.serialization.dag_dependency import DagDependency
from airflow.serialization.definitions.assets import SerializedAssetUniqueKey as UKey
from airflow.serialization.definitions.deadline import DeadlineAlertFields
Expand Down Expand Up @@ -486,7 +487,6 @@ def write_dag(
dag_version = session.scalar(
select(DagVersion)
.where(DagVersion.dag_id == dag.dag_id)
.options(joinedload(DagVersion.task_instances))
.order_by(DagVersion.created_at.desc())
.limit(1)
)
Expand Down Expand Up @@ -520,7 +520,13 @@ def write_dag(
log.debug("Serialized DAG (%s) is unchanged. Skipping writing to DB", dag.dag_id)
return False

if dag_version and not dag_version.task_instances:
has_task_instances: bool = False
if dag_version:
has_task_instances = bool(
session.scalar(select(exists().where(TaskInstance.dag_version_id == dag_version.id)))
)

if dag_version and not has_task_instances:
# This is for dynamic DAGs that the hashes changes often. We should update
# the serialized dag, the dag_version and the dag_code instead of a new version
# if the dag_version is not associated with any task instances
Expand Down