Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions airflow-core/src/airflow/serialization/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,16 @@ def translate_tuples_to_lists(obj: Any):
return {key: translate_tuples_to_lists(value) for key, value in obj.items()}
return obj

def sort_dict_recursively(obj: Any) -> Any:
"""Recursively sort dictionaries to ensure consistent ordering."""
if isinstance(obj, dict):
return {k: sort_dict_recursively(v) for k, v in sorted(obj.items())}
if isinstance(obj, list):
return [sort_dict_recursively(item) for item in obj]
if isinstance(obj, tuple):
return tuple(sort_dict_recursively(item) for item in obj)
return obj

max_length = conf.getint("core", "max_templated_field_length")

if not is_jsonable(template_field):
Expand All @@ -70,6 +80,10 @@ def translate_tuples_to_lists(obj: Any):
# and need to be converted to lists
return template_field
template_field = translate_tuples_to_lists(template_field)
# Sort dictionaries recursively to ensure consistent string representation
# This prevents hash inconsistencies when dict ordering varies
if isinstance(template_field, dict):
template_field = sort_dict_recursively(template_field)
serialized = str(template_field)
if len(serialized) > max_length:
rendered = redact(serialized, name)
Expand Down
53 changes: 53 additions & 0 deletions airflow-core/tests/unit/models/test_serialized_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -545,6 +545,59 @@ def test_hash_method_removes_fileloc_and_remains_consistent(self):
assert "fileloc" in test_data["dag"]
assert test_data["dag"]["fileloc"] == "/different/path/to/dag.py"

def test_hash_method_consistent_with_dict_ordering_in_template_fields(self, dag_maker):
from airflow.sdk.bases.operator import BaseOperator

class MyCustomOp(BaseOperator):
template_fields = ("env_vars",)

def __init__(self, *, task_id: str, **kwargs):
super().__init__(task_id=task_id, **kwargs)
self.env_vars = {"KEY1": "value1", "KEY2": "value2", "KEY3": "value3"}

# Create first DAG with env_vars in one order
with dag_maker("test_dag") as dag1:
MyCustomOp(task_id="task1")

serialized_dag_1 = SerializedDAG.to_dict(dag1)

# Create second DAG with env_vars in different order
with dag_maker("test_dag") as dag2:
task = MyCustomOp(task_id="task1")
# Recreate dict with different insertion order
task.env_vars = {"KEY3": "value3", "KEY1": "value1", "KEY2": "value2"}

serialized_dag_2 = SerializedDAG.to_dict(dag2)

# Verify that the original env_vars have different ordering
env_vars_1 = None
env_vars_2 = None
for task in serialized_dag_1["dag"]["tasks"]:
if task["__var"]["task_id"] == "task1":
env_vars_1 = task["__var"].get("env_vars")
for task in serialized_dag_2["dag"]["tasks"]:
if task["__var"]["task_id"] == "task1":
env_vars_2 = task["__var"].get("env_vars")

assert env_vars_1 is not None, "serialized_dag_1 should have env_vars"
assert env_vars_2 is not None, "serialized_dag_2 should have env_vars"
# The serialized env_vars should be sorted dicts (or strings if truncated)
# If they're dicts, verify they're sorted; if strings, they should be equal due to sorting
if isinstance(env_vars_1, dict) and isinstance(env_vars_2, dict):
# Both should be sorted dictionaries with same content
assert list(env_vars_1.keys()) == sorted(env_vars_1.keys())
assert list(env_vars_2.keys()) == sorted(env_vars_2.keys())
assert env_vars_1 == env_vars_2, "Sorted dicts should be equal regardless of original order"
elif isinstance(env_vars_1, str) and isinstance(env_vars_2, str):
# If truncated to strings, they should be equal due to sorting
assert env_vars_1 == env_vars_2, "String representations should be equal due to sorting"

hash_1 = SDM.hash(serialized_dag_1)
hash_2 = SDM.hash(serialized_dag_2)

# Hashes should be identical
assert hash_1 == hash_2, "Hashes should be identical when dicts are sorted consistently"

def test_dynamic_dag_update_preserves_null_check(self, dag_maker, session):
"""
Test that dynamic DAG update gracefully handles case where SerializedDagModel doesn't exist.
Expand Down