diff --git a/airflow/models/baseoperator.py b/airflow/models/baseoperator.py index 6a44ebbe715d1..871af8d3a553a 100644 --- a/airflow/models/baseoperator.py +++ b/airflow/models/baseoperator.py @@ -1371,6 +1371,14 @@ def is_smart_sensor_compatible(self): """Return if this operator can use smart service. Default False.""" return False + @property + def inherits_from_dummy_operator(self): + """Used to determine if an Operator is inherited from DummyOperator""" + # This looks like `isinstance(self, DummyOperator) would work, but this also + # needs to cope when `self` is a Serialized instance of a DummyOperator or one + # of its sub-classes (which don't inherit from anything but BaseOperator). + return getattr(self, '_is_dummy', False) + def chain(*tasks: Union[BaseOperator, Sequence[BaseOperator]]): r""" diff --git a/airflow/models/dagrun.py b/airflow/models/dagrun.py index 8f8a1f7bc7099..98af00baa7e3a 100644 --- a/airflow/models/dagrun.py +++ b/airflow/models/dagrun.py @@ -724,7 +724,7 @@ def schedule_tis(self, schedulable_tis: Iterable[TI], session: Session = None) - ti for ti in schedulable_tis if ( - ti.task.task_type == "DummyOperator" + ti.task.inherits_from_dummy_operator and not ti.task.on_execute_callback and not ti.task.on_success_callback ) diff --git a/airflow/operators/dummy_operator.py b/airflow/operators/dummy_operator.py index 5592fac4188e7..13d27508f4f86 100644 --- a/airflow/operators/dummy_operator.py +++ b/airflow/operators/dummy_operator.py @@ -29,6 +29,7 @@ class DummyOperator(BaseOperator): """ ui_color = '#e8f7e4' + inherits_from_dummy_operator = True @apply_defaults def __init__(self, **kwargs) -> None: diff --git a/airflow/serialization/schema.json b/airflow/serialization/schema.json index 9056eaab97d58..854b266df1236 100644 --- a/airflow/serialization/schema.json +++ b/airflow/serialization/schema.json @@ -158,7 +158,8 @@ "_downstream_task_ids": { "type": "array", "items": { "type": "string" } - } + }, + "_is_dummy": { "type": "boolean" } }, "additionalProperties": true }, diff --git a/airflow/serialization/serialized_objects.py b/airflow/serialization/serialized_objects.py index 7ce5e1a1d7f3c..e9b5cf268819e 100644 --- a/airflow/serialization/serialized_objects.py +++ b/airflow/serialization/serialized_objects.py @@ -350,6 +350,10 @@ def serialize_operator(cls, op: BaseOperator) -> dict: serialize_op = cls.serialize_to_json(op, cls._decorated_fields) serialize_op['_task_type'] = op.__class__.__name__ serialize_op['_task_module'] = op.__class__.__module__ + + # Used to determine if an Operator is inherited from DummyOperator + serialize_op['_is_dummy'] = op.inherits_from_dummy_operator + if op.operator_extra_links: serialize_op['_operator_extra_links'] = cls._serialize_operator_extra_links( op.operator_extra_links @@ -432,6 +436,9 @@ def deserialize_operator(cls, encoded_op: Dict[str, Any]) -> BaseOperator: if not hasattr(op, field): setattr(op, field, None) + # Used to determine if an Operator is inherited from DummyOperator + setattr(op, "_is_dummy", bool(encoded_op.get("_is_dummy", False))) + return op @classmethod diff --git a/tests/dags/test_only_dummy_tasks.py b/tests/dags/test_only_dummy_tasks.py index 8b0f04d8e5c1c..ba96b73151b4d 100644 --- a/tests/dags/test_only_dummy_tasks.py +++ b/tests/dags/test_only_dummy_tasks.py @@ -29,6 +29,16 @@ dag = DAG(dag_id="test_only_dummy_tasks", default_args=default_args, schedule_interval='@once') + +class MyDummyOperator(DummyOperator): + template_fields_renderers = {"body": "json"} + template_fields = ("body",) + + def __init__(self, body, *args, **kwargs): + super().__init__(*args, **kwargs) + self.body = body + + with dag: task_a = DummyOperator(task_id="test_task_a") @@ -36,7 +46,7 @@ task_a >> task_b - task_c = DummyOperator(task_id="test_task_c") + task_c = MyDummyOperator(task_id="test_task_c", body={"hello": "world"}) task_d = DummyOperator(task_id="test_task_on_execute", on_execute_callback=lambda *args, **kwargs: 1) diff --git a/tests/serialization/test_dag_serialization.py b/tests/serialization/test_dag_serialization.py index f68fd2e37e2f1..97c0647209930 100644 --- a/tests/serialization/test_dag_serialization.py +++ b/tests/serialization/test_dag_serialization.py @@ -86,6 +86,7 @@ "retry_delay": 300.0, "_downstream_task_ids": [], "_inlets": [], + "_is_dummy": False, "_outlets": [], "ui_color": "#f0ede4", "ui_fgcolor": "#000", @@ -112,6 +113,7 @@ "retry_delay": 300.0, "_downstream_task_ids": [], "_inlets": [], + "_is_dummy": False, "_outlets": [], "_operator_extra_links": [{"tests.test_utils.mock_operators.CustomOpLink": {}}], "ui_color": "#fff",