From a7ebbb924f77c8ad3ef28b71a076d5e042a982f5 Mon Sep 17 00:00:00 2001 From: Kaxil Naik Date: Thu, 28 Aug 2025 01:55:09 +0100 Subject: [PATCH] Remove support for re-serializing a Serialized Operator Re-adding https://github.com/apache/airflow/pull/54428 -- one of the recent refactors undid the changes. --- .../src/airflow/serialization/serialized_objects.py | 12 +++++------- airflow-core/tests/unit/models/test_cleartasks.py | 1 - .../unit/serialization/test_serialized_objects.py | 10 ++++++++++ 3 files changed, 15 insertions(+), 8 deletions(-) diff --git a/airflow-core/src/airflow/serialization/serialized_objects.py b/airflow-core/src/airflow/serialization/serialized_objects.py index 6cbf73897038c..72d27e651525f 100644 --- a/airflow-core/src/airflow/serialization/serialized_objects.py +++ b/airflow-core/src/airflow/serialization/serialized_objects.py @@ -717,8 +717,6 @@ def serialize( :meta private: """ - from airflow.models.mappedoperator import MappedOperator as SchedulerMappedOperator - if cls._is_primitive(var): # enum.IntEnum is an int instance, it causes json dumps error so we use its value. if isinstance(var, enum.Enum): @@ -758,9 +756,9 @@ def serialize( return cls._encode(DeadlineAlert.serialize_deadline_alert(var), type_=DAT.DEADLINE_ALERT) elif isinstance(var, Resources): return var.to_dict() - elif isinstance(var, (MappedOperator, SchedulerMappedOperator)): + elif isinstance(var, MappedOperator): return cls._encode(SerializedBaseOperator.serialize_mapped_operator(var), type_=DAT.OP) - elif isinstance(var, (BaseOperator, SerializedBaseOperator)): + elif isinstance(var, BaseOperator): var._needs_expansion = var.get_needs_expansion() return cls._encode(SerializedBaseOperator.serialize_operator(var), type_=DAT.OP) elif isinstance(var, cls._datetime_types): @@ -1358,7 +1356,7 @@ def __getattr__(self, name): raise AttributeError(f"'{self.task_type}' object has no attribute '{name}'") @classmethod - def serialize_mapped_operator(cls, op: MappedOperator | SchedulerMappedOperator) -> dict[str, Any]: + def serialize_mapped_operator(cls, op: MappedOperator) -> dict[str, Any]: serialized_op = cls._serialize_node(op) # Handle expand_input and op_kwargs_expand_input. expansion_kwargs = op._get_specified_expand_input() @@ -1384,11 +1382,11 @@ def serialize_mapped_operator(cls, op: MappedOperator | SchedulerMappedOperator) return serialized_op @classmethod - def serialize_operator(cls, op: SdkOperator | SchedulerOperator) -> dict[str, Any]: + def serialize_operator(cls, op: SdkOperator) -> dict[str, Any]: return cls._serialize_node(op) @classmethod - def _serialize_node(cls, op: SdkOperator | SchedulerOperator) -> dict[str, Any]: + def _serialize_node(cls, op: SdkOperator) -> dict[str, Any]: """Serialize operator into a JSON object.""" serialize_op = cls.serialize_to_json(op, cls._decorated_fields) diff --git a/airflow-core/tests/unit/models/test_cleartasks.py b/airflow-core/tests/unit/models/test_cleartasks.py index 8e4e23a7bba17..2d70e8a793c98 100644 --- a/airflow-core/tests/unit/models/test_cleartasks.py +++ b/airflow-core/tests/unit/models/test_cleartasks.py @@ -713,7 +713,6 @@ def test_clear_task_instances_with_run_on_latest_version(self, run_on_latest_ver bundle_version="v2", ) as dag: EmptyOperator(task_id="0") - SerializedDagModel.write_dag(dag=dag, bundle_name="dag_maker", bundle_version="v2") new_dag_version = DagVersion.get_latest_version(dag.dag_id) assert old_dag_version.id != new_dag_version.id diff --git a/airflow-core/tests/unit/serialization/test_serialized_objects.py b/airflow-core/tests/unit/serialization/test_serialized_objects.py index 7aededd12bdf2..6b6fa9a52602e 100644 --- a/airflow-core/tests/unit/serialization/test_serialized_objects.py +++ b/airflow-core/tests/unit/serialization/test_serialized_objects.py @@ -159,6 +159,16 @@ class Test: BaseSerialization.serialize(obj, strict=True) # now raises +def test_prevent_re_serialization_of_serialized_operators(): + """SerializedBaseOperator should not be re-serializable.""" + from airflow.serialization.serialized_objects import BaseSerialization, SerializedBaseOperator + + serialized_op = SerializedBaseOperator(task_id="test_task") + + with pytest.raises(SerializationError, match="Encountered unexpected type"): + BaseSerialization.serialize(serialized_op, strict=True) + + def test_validate_schema(): from airflow.serialization.serialized_objects import BaseSerialization