Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -754,9 +754,6 @@ def serialize(
return var.to_dict()
elif isinstance(var, MappedOperator):
return cls._encode(SerializedBaseOperator.serialize_mapped_operator(var), type_=DAT.OP)
elif isinstance(var, (BaseOperator, SerializedBaseOperator)):
var._needs_expansion = var.get_needs_expansion()
return cls._encode(SerializedBaseOperator.serialize_operator(var), type_=DAT.OP)
elif isinstance(var, cls._datetime_types):
return cls._encode(var.timestamp(), type_=DAT.DATETIME)
elif isinstance(var, datetime.timedelta):
Expand Down Expand Up @@ -1359,11 +1356,11 @@ def serialize_mapped_operator(cls, op: MappedOperator) -> dict[str, Any]:
return serialized_op

@classmethod
def serialize_operator(cls, op: BaseOperator | MappedOperator | SerializedBaseOperator) -> dict[str, Any]:
def serialize_operator(cls, op: BaseOperator | MappedOperator) -> dict[str, Any]:
return cls._serialize_node(op)

@classmethod
def _serialize_node(cls, op: BaseOperator | MappedOperator | SerializedBaseOperator) -> dict[str, Any]:
def _serialize_node(cls, op: BaseOperator | MappedOperator) -> dict[str, Any]:
"""Serialize operator into a JSON object."""
serialize_op = cls.serialize_to_json(op, cls._decorated_fields)

Expand Down
18 changes: 0 additions & 18 deletions airflow-core/tests/unit/serialization/test_dag_serialization.py
Original file line number Diff line number Diff line change
Expand Up @@ -1530,24 +1530,6 @@ def test_task_group_serialization(self):
assert serialized_dag.task_group.children
assert serialized_dag.task_group.children.keys() == dag.task_group.children.keys()

def check_task_group(node):
assert node.dag is serialized_dag
try:
children = node.children.values()
except AttributeError:
# Round-trip serialization and check the result
expected_serialized = SerializedBaseOperator.serialize_operator(dag.get_task(node.task_id))
expected_deserialized = SerializedBaseOperator.deserialize_operator(expected_serialized)
expected_dict = SerializedBaseOperator.serialize_operator(expected_deserialized)
assert node
assert SerializedBaseOperator.serialize_operator(node) == expected_dict
return

for child in children:
check_task_group(child)

check_task_group(serialized_dag.task_group)

@staticmethod
def assert_taskgroup_children(se_task_group, dag_task_group, expected_children):
assert se_task_group.children.keys() == dag_task_group.children.keys() == expected_children
Expand Down
Loading