diff --git a/airflow-core/src/airflow/serialization/helpers.py b/airflow-core/src/airflow/serialization/helpers.py index 949b3cb9c9f09..5f564df6188ed 100644 --- a/airflow-core/src/airflow/serialization/helpers.py +++ b/airflow-core/src/airflow/serialization/helpers.py @@ -51,6 +51,16 @@ def translate_tuples_to_lists(obj: Any): return {key: translate_tuples_to_lists(value) for key, value in obj.items()} return obj + def sort_dict_recursively(obj: Any) -> Any: + """Recursively sort dictionaries to ensure consistent ordering.""" + if isinstance(obj, dict): + return {k: sort_dict_recursively(v) for k, v in sorted(obj.items())} + if isinstance(obj, list): + return [sort_dict_recursively(item) for item in obj] + if isinstance(obj, tuple): + return tuple(sort_dict_recursively(item) for item in obj) + return obj + max_length = conf.getint("core", "max_templated_field_length") if not is_jsonable(template_field): @@ -70,6 +80,10 @@ def translate_tuples_to_lists(obj: Any): # and need to be converted to lists return template_field template_field = translate_tuples_to_lists(template_field) + # Sort dictionaries recursively to ensure consistent string representation + # This prevents hash inconsistencies when dict ordering varies + if isinstance(template_field, dict): + template_field = sort_dict_recursively(template_field) serialized = str(template_field) if len(serialized) > max_length: rendered = redact(serialized, name) diff --git a/airflow-core/tests/unit/models/test_serialized_dag.py b/airflow-core/tests/unit/models/test_serialized_dag.py index 92446cee2189c..a81ecaf13fcc8 100644 --- a/airflow-core/tests/unit/models/test_serialized_dag.py +++ b/airflow-core/tests/unit/models/test_serialized_dag.py @@ -545,6 +545,59 @@ def test_hash_method_removes_fileloc_and_remains_consistent(self): assert "fileloc" in test_data["dag"] assert test_data["dag"]["fileloc"] == "/different/path/to/dag.py" + def test_hash_method_consistent_with_dict_ordering_in_template_fields(self, dag_maker): + from airflow.sdk.bases.operator import BaseOperator + + class MyCustomOp(BaseOperator): + template_fields = ("env_vars",) + + def __init__(self, *, task_id: str, **kwargs): + super().__init__(task_id=task_id, **kwargs) + self.env_vars = {"KEY1": "value1", "KEY2": "value2", "KEY3": "value3"} + + # Create first DAG with env_vars in one order + with dag_maker("test_dag") as dag1: + MyCustomOp(task_id="task1") + + serialized_dag_1 = SerializedDAG.to_dict(dag1) + + # Create second DAG with env_vars in different order + with dag_maker("test_dag") as dag2: + task = MyCustomOp(task_id="task1") + # Recreate dict with different insertion order + task.env_vars = {"KEY3": "value3", "KEY1": "value1", "KEY2": "value2"} + + serialized_dag_2 = SerializedDAG.to_dict(dag2) + + # Verify that the original env_vars have different ordering + env_vars_1 = None + env_vars_2 = None + for task in serialized_dag_1["dag"]["tasks"]: + if task["__var"]["task_id"] == "task1": + env_vars_1 = task["__var"].get("env_vars") + for task in serialized_dag_2["dag"]["tasks"]: + if task["__var"]["task_id"] == "task1": + env_vars_2 = task["__var"].get("env_vars") + + assert env_vars_1 is not None, "serialized_dag_1 should have env_vars" + assert env_vars_2 is not None, "serialized_dag_2 should have env_vars" + # The serialized env_vars should be sorted dicts (or strings if truncated) + # If they're dicts, verify they're sorted; if strings, they should be equal due to sorting + if isinstance(env_vars_1, dict) and isinstance(env_vars_2, dict): + # Both should be sorted dictionaries with same content + assert list(env_vars_1.keys()) == sorted(env_vars_1.keys()) + assert list(env_vars_2.keys()) == sorted(env_vars_2.keys()) + assert env_vars_1 == env_vars_2, "Sorted dicts should be equal regardless of original order" + elif isinstance(env_vars_1, str) and isinstance(env_vars_2, str): + # If truncated to strings, they should be equal due to sorting + assert env_vars_1 == env_vars_2, "String representations should be equal due to sorting" + + hash_1 = SDM.hash(serialized_dag_1) + hash_2 = SDM.hash(serialized_dag_2) + + # Hashes should be identical + assert hash_1 == hash_2, "Hashes should be identical when dicts are sorted consistently" + def test_dynamic_dag_update_preserves_null_check(self, dag_maker, session): """ Test that dynamic DAG update gracefully handles case where SerializedDagModel doesn't exist.