diff --git a/airflow-core/src/airflow/models/serialized_dag.py b/airflow-core/src/airflow/models/serialized_dag.py index 1e75e86aad825..e4b458084a339 100644 --- a/airflow-core/src/airflow/models/serialized_dag.py +++ b/airflow-core/src/airflow/models/serialized_dag.py @@ -27,7 +27,7 @@ import sqlalchemy_jsonfield import uuid6 -from sqlalchemy import ForeignKey, LargeBinary, String, select, tuple_ +from sqlalchemy import ForeignKey, LargeBinary, String, select, tuple_, update from sqlalchemy.dialects.postgresql import JSONB from sqlalchemy.orm import Mapped, backref, foreign, joinedload, relationship from sqlalchemy.sql.expression import func, literal @@ -434,14 +434,23 @@ def write_dag( # This is for dynamic DAGs that the hashes changes often. We should update # the serialized dag, the dag_version and the dag_code instead of a new version # if the dag_version is not associated with any task instances - latest_ser_dag = cls.get(dag.dag_id, session=session) - if not latest_ser_dag: + + # Use direct UPDATE to avoid loading the full serialized DAG + result = session.execute( + update(cls) + .where(cls.dag_version_id == dag_version.id) + .values( + { + cls._data: new_serialized_dag._data, + cls._data_compressed: new_serialized_dag._data_compressed, + cls.dag_hash: new_serialized_dag.dag_hash, + } + ) + ) + + if result.rowcount == 0: + # No rows updated - serialized DAG doesn't exist return False - # Update the serialized DAG with the new_serialized_dag - latest_ser_dag._data = new_serialized_dag._data - latest_ser_dag._data_compressed = new_serialized_dag._data_compressed - latest_ser_dag.dag_hash = new_serialized_dag.dag_hash - session.merge(latest_ser_dag) # The dag_version and dag_code may not have changed, still we should # do the below actions: # Update the latest dag version diff --git a/airflow-core/tests/unit/models/test_serialized_dag.py b/airflow-core/tests/unit/models/test_serialized_dag.py index e310836a3911a..ea9ff4cbfddcc 100644 --- a/airflow-core/tests/unit/models/test_serialized_dag.py +++ b/airflow-core/tests/unit/models/test_serialized_dag.py @@ -544,3 +544,87 @@ def test_hash_method_removes_fileloc_and_remains_consistent(self): # Verify that the original data still has fileloc (method shouldn't modify original) assert "fileloc" in test_data["dag"] assert test_data["dag"]["fileloc"] == "/different/path/to/dag.py" + + def test_dynamic_dag_update_preserves_null_check(self, dag_maker, session): + """ + Test that dynamic DAG update gracefully handles case where SerializedDagModel doesn't exist. + This preserves the null-check fix from PR #56422 and tests the direct UPDATE path. + """ + with dag_maker(dag_id="test_missing_serdag", serialized=True, session=session) as dag: + EmptyOperator(task_id="task1") + + # Write the DAG first + lazy_dag = LazyDeserializedDAG.from_dag(dag) + SDM.write_dag( + dag=lazy_dag, + bundle_name="test_bundle", + bundle_version=None, + session=session, + ) + session.commit() + + # Get the dag_version + dag_version = session.scalar( + select(DagVersion).where(DagVersion.dag_id == "test_missing_serdag").limit(1) + ) + assert dag_version is not None + + # Manually delete SerializedDagModel (simulates edge case) + session.query(SDM).filter(SDM.dag_id == "test_missing_serdag").delete() + session.commit() + + # Verify no SerializedDagModel exists + assert SDM.get("test_missing_serdag", session=session) is None + + # Try to update - should return False gracefully (not crash) + result = SDM.write_dag( + dag=lazy_dag, + bundle_name="test_bundle", + bundle_version=None, + min_update_interval=None, + session=session, + ) + + assert result is False # Should return False when SerializedDagModel is missing + + def test_dynamic_dag_update_success(self, dag_maker, session): + """ + Test that dynamic DAG update successfully updates the serialized DAG hash + when no task instances exist. + """ + with dag_maker(dag_id="test_dynamic_success", session=session) as dag: + EmptyOperator(task_id="task1") + + # Write the DAG first + lazy_dag = LazyDeserializedDAG.from_dag(dag) + result1 = SDM.write_dag( + dag=lazy_dag, + bundle_name="test_bundle", + bundle_version=None, + session=session, + ) + session.commit() + + assert result1 is True + initial_sdag = SDM.get("test_dynamic_success", session=session) + assert initial_sdag is not None + initial_hash = initial_sdag.dag_hash + + # Modify the DAG (add a task) + EmptyOperator(task_id="task2", dag=dag) + lazy_dag_updated = LazyDeserializedDAG.from_dag(dag) + + # Write again - should use UPDATE path (no task instances yet) + result2 = SDM.write_dag( + dag=lazy_dag_updated, + bundle_name="test_bundle", + bundle_version=None, + session=session, + ) + session.commit() + + # Verify update succeeded + assert result2 is True + updated_sdag = SDM.get("test_dynamic_success", session=session) + assert updated_sdag.dag_hash != initial_hash # Hash should change + assert len(updated_sdag.dag.task_dict) == 2 # Should have 2 tasks now