diff --git a/airflow-core/src/airflow/serialization/serialized_objects.py b/airflow-core/src/airflow/serialization/serialized_objects.py index cadd701248e73..b2e3a7b591387 100644 --- a/airflow-core/src/airflow/serialization/serialized_objects.py +++ b/airflow-core/src/airflow/serialization/serialized_objects.py @@ -758,7 +758,7 @@ def serialize( return var.to_dict() elif isinstance(var, MappedOperator): return cls._encode(SerializedBaseOperator.serialize_mapped_operator(var), type_=DAT.OP) - elif isinstance(var, (BaseOperator, SerializedBaseOperator)): + elif isinstance(var, BaseOperator): var._needs_expansion = var.get_needs_expansion() return cls._encode(SerializedBaseOperator.serialize_operator(var), type_=DAT.OP) elif isinstance(var, cls._datetime_types): @@ -1372,11 +1372,11 @@ def serialize_mapped_operator(cls, op: MappedOperator) -> dict[str, Any]: return serialized_op @classmethod - def serialize_operator(cls, op: BaseOperator | MappedOperator | SerializedBaseOperator) -> dict[str, Any]: + def serialize_operator(cls, op: BaseOperator | MappedOperator) -> dict[str, Any]: return cls._serialize_node(op) @classmethod - def _serialize_node(cls, op: BaseOperator | MappedOperator | SerializedBaseOperator) -> dict[str, Any]: + def _serialize_node(cls, op: BaseOperator | MappedOperator) -> dict[str, Any]: """Serialize operator into a JSON object.""" serialize_op = cls.serialize_to_json(op, cls._decorated_fields) diff --git a/airflow-core/tests/unit/jobs/test_scheduler_job.py b/airflow-core/tests/unit/jobs/test_scheduler_job.py index f12018d87e2a6..2b10c6a929196 100644 --- a/airflow-core/tests/unit/jobs/test_scheduler_job.py +++ b/airflow-core/tests/unit/jobs/test_scheduler_job.py @@ -3423,7 +3423,7 @@ def test_verify_integrity_if_dag_not_changed(self, dag_maker, session): session = settings.Session() orm_dag = dag_maker.dag_model assert orm_dag is not None - SerializedDagModel.write_dag(dag, bundle_name="testing") + scheduler_job = Job() self.job_runner = SchedulerJobRunner(job=scheduler_job) @@ -3465,7 +3465,7 @@ def test_verify_integrity_if_dag_changed(self, dag_maker): SerializedDagModel.dag_id == "test_verify_integrity_if_dag_changed" ).delete(synchronize_session=False) - with dag_maker(dag_id="test_verify_integrity_if_dag_changed") as dag: + with dag_maker(dag_id="test_verify_integrity_if_dag_changed", serialized=False) as dag: BashOperator(task_id="dummy", bash_command="echo hi") scheduler_job = Job() @@ -5756,7 +5756,7 @@ def test_scheduler_job_add_new_task(self, dag_maker): """ Test if a task instance will be added if the dag is updated """ - with dag_maker(dag_id="test_scheduler_add_new_task") as dag: + with dag_maker(dag_id="test_scheduler_add_new_task", serialized=False) as dag: BashOperator(task_id="dummy", bash_command="echo test") scheduler_job = Job() diff --git a/airflow-core/tests/unit/serialization/test_serialized_objects.py b/airflow-core/tests/unit/serialization/test_serialized_objects.py index cf56a42211fa3..86ab66f66bc09 100644 --- a/airflow-core/tests/unit/serialization/test_serialized_objects.py +++ b/airflow-core/tests/unit/serialization/test_serialized_objects.py @@ -563,18 +563,6 @@ def test_roundtrip_exceptions(): assert deser.timeout == timedelta(seconds=30) -@pytest.mark.db_test -def test_serialized_dag_to_dict_and_from_dict_gives_same_result_in_tasks(dag_maker): - with dag_maker() as dag: - BashOperator(task_id="task1", bash_command="echo 1") - - dag1 = SerializedDAG.to_dict(dag) - from_dict = SerializedDAG.from_dict(dag1) - dag2 = SerializedDAG.to_dict(from_dict) - - assert dag2["dag"]["tasks"][0]["__var"].keys() == dag1["dag"]["tasks"][0]["__var"].keys() - - @pytest.mark.parametrize( "concurrency_parameter", [ @@ -643,7 +631,7 @@ def test_serialized_dag_get_run_data_interval(create_dag_run_kwargs, dag_maker, pre-AIP-39: the dag run itself has neither data_interval_start nor data_interval_end, and its logical_date is none. it should return data_interval as none """ - with dag_maker(dag_id="test_dag", session=session, serialized=True) as dag: + with dag_maker(dag_id="test_dag", session=session, serialized=False) as dag: BaseOperator(task_id="test_task") session.commit()