Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -758,7 +758,7 @@ def serialize(
return var.to_dict()
elif isinstance(var, MappedOperator):
return cls._encode(SerializedBaseOperator.serialize_mapped_operator(var), type_=DAT.OP)
elif isinstance(var, (BaseOperator, SerializedBaseOperator)):
elif isinstance(var, BaseOperator):
var._needs_expansion = var.get_needs_expansion()
return cls._encode(SerializedBaseOperator.serialize_operator(var), type_=DAT.OP)
elif isinstance(var, cls._datetime_types):
Expand Down Expand Up @@ -1372,11 +1372,11 @@ def serialize_mapped_operator(cls, op: MappedOperator) -> dict[str, Any]:
return serialized_op

@classmethod
def serialize_operator(cls, op: BaseOperator | MappedOperator | SerializedBaseOperator) -> dict[str, Any]:
def serialize_operator(cls, op: BaseOperator | MappedOperator) -> dict[str, Any]:
return cls._serialize_node(op)

@classmethod
def _serialize_node(cls, op: BaseOperator | MappedOperator | SerializedBaseOperator) -> dict[str, Any]:
def _serialize_node(cls, op: BaseOperator | MappedOperator) -> dict[str, Any]:
"""Serialize operator into a JSON object."""
serialize_op = cls.serialize_to_json(op, cls._decorated_fields)

Expand Down
6 changes: 3 additions & 3 deletions airflow-core/tests/unit/jobs/test_scheduler_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -3423,7 +3423,7 @@ def test_verify_integrity_if_dag_not_changed(self, dag_maker, session):
session = settings.Session()
orm_dag = dag_maker.dag_model
assert orm_dag is not None
SerializedDagModel.write_dag(dag, bundle_name="testing")

scheduler_job = Job()
self.job_runner = SchedulerJobRunner(job=scheduler_job)

Expand Down Expand Up @@ -3465,7 +3465,7 @@ def test_verify_integrity_if_dag_changed(self, dag_maker):
SerializedDagModel.dag_id == "test_verify_integrity_if_dag_changed"
).delete(synchronize_session=False)

with dag_maker(dag_id="test_verify_integrity_if_dag_changed") as dag:
with dag_maker(dag_id="test_verify_integrity_if_dag_changed", serialized=False) as dag:
BashOperator(task_id="dummy", bash_command="echo hi")

scheduler_job = Job()
Expand Down Expand Up @@ -5756,7 +5756,7 @@ def test_scheduler_job_add_new_task(self, dag_maker):
"""
Test if a task instance will be added if the dag is updated
"""
with dag_maker(dag_id="test_scheduler_add_new_task") as dag:
with dag_maker(dag_id="test_scheduler_add_new_task", serialized=False) as dag:
BashOperator(task_id="dummy", bash_command="echo test")

scheduler_job = Job()
Expand Down
14 changes: 1 addition & 13 deletions airflow-core/tests/unit/serialization/test_serialized_objects.py
Original file line number Diff line number Diff line change
Expand Up @@ -563,18 +563,6 @@ def test_roundtrip_exceptions():
assert deser.timeout == timedelta(seconds=30)


@pytest.mark.db_test
def test_serialized_dag_to_dict_and_from_dict_gives_same_result_in_tasks(dag_maker):
with dag_maker() as dag:
BashOperator(task_id="task1", bash_command="echo 1")

dag1 = SerializedDAG.to_dict(dag)
from_dict = SerializedDAG.from_dict(dag1)
dag2 = SerializedDAG.to_dict(from_dict)

assert dag2["dag"]["tasks"][0]["__var"].keys() == dag1["dag"]["tasks"][0]["__var"].keys()


@pytest.mark.parametrize(
"concurrency_parameter",
[
Expand Down Expand Up @@ -643,7 +631,7 @@ def test_serialized_dag_get_run_data_interval(create_dag_run_kwargs, dag_maker,
pre-AIP-39: the dag run itself has neither data_interval_start nor data_interval_end, and its logical_date
is none. it should return data_interval as none
"""
with dag_maker(dag_id="test_dag", session=session, serialized=True) as dag:
with dag_maker(dag_id="test_dag", session=session, serialized=False) as dag:
BaseOperator(task_id="test_task")
session.commit()

Expand Down
Loading