diff --git a/airflow-core/src/airflow/models/dag_version.py b/airflow-core/src/airflow/models/dag_version.py index 3f57333333b22..1245f3c9b99b9 100644 --- a/airflow-core/src/airflow/models/dag_version.py +++ b/airflow-core/src/airflow/models/dag_version.py @@ -129,7 +129,6 @@ def write_dag( ) log.debug("Writing DagVersion %s to the DB", dag_version) session.add(dag_version) - session.commit() log.debug("DagVersion %s written to the DB", dag_version) return dag_version diff --git a/airflow-core/tests/unit/jobs/test_scheduler_job.py b/airflow-core/tests/unit/jobs/test_scheduler_job.py index 5f6e5fdfc6108..4234fb27641f8 100644 --- a/airflow-core/tests/unit/jobs/test_scheduler_job.py +++ b/airflow-core/tests/unit/jobs/test_scheduler_job.py @@ -3793,7 +3793,7 @@ def test_verify_integrity_if_dag_changed(self, dag_maker): SerializedDagModel.write_dag( LazyDeserializedDAG.from_dag(dag), bundle_name="testing", session=session ) - + session.commit() dag_version_2 = DagVersion.get_latest_version(dr.dag_id, session=session) assert dag_version_2 != dag_version_1 diff --git a/airflow-core/tests/unit/models/test_serialized_dag.py b/airflow-core/tests/unit/models/test_serialized_dag.py index ca1c20b5d8d93..92446cee2189c 100644 --- a/airflow-core/tests/unit/models/test_serialized_dag.py +++ b/airflow-core/tests/unit/models/test_serialized_dag.py @@ -628,3 +628,57 @@ def test_dynamic_dag_update_success(self, dag_maker, session): updated_sdag = SDM.get("test_dynamic_success", session=session) assert updated_sdag.dag_hash != initial_hash # Hash should change assert len(updated_sdag.dag.task_dict) == 2 # Should have 2 tasks now + + def test_write_dag_atomicity_on_dagcode_failure(self, dag_maker, session): + """ + Test that SerializedDagModel.write_dag maintains atomicity. + + If DagCode.write_code fails, the entire transaction should rollback, + including the DagVersion. This test verifies that DagVersion is not + committed separately, which would leave orphaned records. + + This test would fail if DagVersion.write_dag() was used (which commits + immediately), because the DagVersion would be persisted even though + the rest of the transaction failed. + """ + from airflow.models.dagcode import DagCode + + with dag_maker("test_atomicity_dag"): + EmptyOperator(task_id="task1") + + dag = dag_maker.dag + initial_version_count = session.query(DagVersion).filter(DagVersion.dag_id == dag.dag_id).count() + assert initial_version_count == 1, "Should have one DagVersion after initial write" + dag_maker.create_dagrun() # ensure the second dag version is created + + EmptyOperator(task_id="task2", dag=dag) + modified_lazy_dag = LazyDeserializedDAG.from_dag(dag) + + # Mock DagCode.write_code to raise an exception + with mock.patch.object( + DagCode, "write_code", side_effect=RuntimeError("Simulated DagCode.write_code failure") + ): + with pytest.raises(RuntimeError, match="Simulated DagCode.write_code failure"): + SDM.write_dag( + dag=modified_lazy_dag, + bundle_name="testing", + bundle_version=None, + session=session, + ) + session.rollback() + + # Verify that no new DagVersion was committed + # Use a fresh session to ensure we're reading from committed data + with create_session() as fresh_session: + final_version_count = ( + fresh_session.query(DagVersion).filter(DagVersion.dag_id == dag.dag_id).count() + ) + assert final_version_count == initial_version_count, ( + "DagVersion should not be committed when DagCode.write_code fails" + ) + + sdag = SDM.get(dag.dag_id, session=fresh_session) + assert sdag is not None, "Original SerializedDagModel should still exist" + assert len(sdag.dag.task_dict) == 1, ( + "SerializedDagModel should not be updated when write fails" + )