Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion airflow-core/src/airflow/models/dag_version.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,6 @@ def write_dag(
)
log.debug("Writing DagVersion %s to the DB", dag_version)
session.add(dag_version)
session.commit()
log.debug("DagVersion %s written to the DB", dag_version)
return dag_version

Expand Down
2 changes: 1 addition & 1 deletion airflow-core/tests/unit/jobs/test_scheduler_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -3793,7 +3793,7 @@ def test_verify_integrity_if_dag_changed(self, dag_maker):
SerializedDagModel.write_dag(
LazyDeserializedDAG.from_dag(dag), bundle_name="testing", session=session
)

session.commit()
dag_version_2 = DagVersion.get_latest_version(dr.dag_id, session=session)
assert dag_version_2 != dag_version_1

Expand Down
54 changes: 54 additions & 0 deletions airflow-core/tests/unit/models/test_serialized_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -628,3 +628,57 @@ def test_dynamic_dag_update_success(self, dag_maker, session):
updated_sdag = SDM.get("test_dynamic_success", session=session)
assert updated_sdag.dag_hash != initial_hash # Hash should change
assert len(updated_sdag.dag.task_dict) == 2 # Should have 2 tasks now

def test_write_dag_atomicity_on_dagcode_failure(self, dag_maker, session):
"""
Test that SerializedDagModel.write_dag maintains atomicity.

If DagCode.write_code fails, the entire transaction should rollback,
including the DagVersion. This test verifies that DagVersion is not
committed separately, which would leave orphaned records.

This test would fail if DagVersion.write_dag() was used (which commits
immediately), because the DagVersion would be persisted even though
the rest of the transaction failed.
"""
from airflow.models.dagcode import DagCode

with dag_maker("test_atomicity_dag"):
EmptyOperator(task_id="task1")

dag = dag_maker.dag
initial_version_count = session.query(DagVersion).filter(DagVersion.dag_id == dag.dag_id).count()
assert initial_version_count == 1, "Should have one DagVersion after initial write"
dag_maker.create_dagrun() # ensure the second dag version is created

EmptyOperator(task_id="task2", dag=dag)
modified_lazy_dag = LazyDeserializedDAG.from_dag(dag)

# Mock DagCode.write_code to raise an exception
with mock.patch.object(
DagCode, "write_code", side_effect=RuntimeError("Simulated DagCode.write_code failure")
):
with pytest.raises(RuntimeError, match="Simulated DagCode.write_code failure"):
SDM.write_dag(
dag=modified_lazy_dag,
bundle_name="testing",
bundle_version=None,
session=session,
)
session.rollback()

# Verify that no new DagVersion was committed
# Use a fresh session to ensure we're reading from committed data
with create_session() as fresh_session:
final_version_count = (
fresh_session.query(DagVersion).filter(DagVersion.dag_id == dag.dag_id).count()
)
assert final_version_count == initial_version_count, (
"DagVersion should not be committed when DagCode.write_code fails"
)

sdag = SDM.get(dag.dag_id, session=fresh_session)
assert sdag is not None, "Original SerializedDagModel should still exist"
assert len(sdag.dag.task_dict) == 1, (
"SerializedDagModel should not be updated when write fails"
)