From c7dcc10726294f94410fd72e97530377e733ea1b Mon Sep 17 00:00:00 2001 From: Kaxil Naik Date: Tue, 1 Dec 2020 19:24:03 +0000 Subject: [PATCH 1/5] Optimize subclasses of DummyOperator for Scheduling Custom operators inheriting from DummyOperator will now instead of going to a scheduled state will go set straight to success if they don't have callbacks set. closes https://github.com/apache/airflow/issues/11393 --- airflow/models/dagrun.py | 2 +- airflow/serialization/schema.json | 3 ++- airflow/serialization/serialized_objects.py | 15 +++++++++++++++ tests/dags/test_only_dummy_tasks.py | 12 +++++++++++- 4 files changed, 29 insertions(+), 3 deletions(-) diff --git a/airflow/models/dagrun.py b/airflow/models/dagrun.py index 8f8a1f7bc7099..2ca8beeaf5344 100644 --- a/airflow/models/dagrun.py +++ b/airflow/models/dagrun.py @@ -724,7 +724,7 @@ def schedule_tis(self, schedulable_tis: Iterable[TI], session: Session = None) - ti for ti in schedulable_tis if ( - ti.task.task_type == "DummyOperator" + (ti.task.task_type == "DummyOperator" or getattr(ti.task, "_is_dummy", False)) and not ti.task.on_execute_callback and not ti.task.on_success_callback ) diff --git a/airflow/serialization/schema.json b/airflow/serialization/schema.json index 9056eaab97d58..854b266df1236 100644 --- a/airflow/serialization/schema.json +++ b/airflow/serialization/schema.json @@ -158,7 +158,8 @@ "_downstream_task_ids": { "type": "array", "items": { "type": "string" } - } + }, + "_is_dummy": { "type": "boolean" } }, "additionalProperties": true }, diff --git a/airflow/serialization/serialized_objects.py b/airflow/serialization/serialized_objects.py index 7ce5e1a1d7f3c..b31ef259ed30d 100644 --- a/airflow/serialization/serialized_objects.py +++ b/airflow/serialization/serialized_objects.py @@ -31,6 +31,7 @@ from airflow.models.baseoperator import BaseOperator, BaseOperatorLink from airflow.models.connection import Connection from airflow.models.dag import DAG +from airflow.operators.dummy_operator import DummyOperator from airflow.serialization.enums import DagAttributeTypes as DAT, Encoding from airflow.serialization.helpers import serialize_template_field from airflow.serialization.json_schema import Validator, load_dag_schema @@ -350,6 +351,10 @@ def serialize_operator(cls, op: BaseOperator) -> dict: serialize_op = cls.serialize_to_json(op, cls._decorated_fields) serialize_op['_task_type'] = op.__class__.__name__ serialize_op['_task_module'] = op.__class__.__module__ + + # Used to determine if an Operator is inherited from DummyOperator + serialize_op['_is_dummy'] = cls._is_inherited_from_dummy_operator(op) + if op.operator_extra_links: serialize_op['_operator_extra_links'] = cls._serialize_operator_extra_links( op.operator_extra_links @@ -432,6 +437,9 @@ def deserialize_operator(cls, encoded_op: Dict[str, Any]) -> BaseOperator: if not hasattr(op, field): setattr(op, field, None) + # Used to determine if an Operator is inherited from DummyOperator + setattr(op, "_is_dummy", bool(encoded_op.get("_is_dummy"))) + return op @classmethod @@ -532,6 +540,13 @@ def _serialize_operator_extra_links(cls, operator_extra_links: Iterable[BaseOper return serialize_operator_extra_links + @classmethod + def _is_inherited_from_dummy_operator(cls, op: BaseOperator) -> bool: + """Used to determine if an Operator is inherited from DummyOperator""" + if op.task_type == "DummyOperator" or isinstance(op, DummyOperator): + return True + return False + class SerializedDAG(DAG, BaseSerialization): """ diff --git a/tests/dags/test_only_dummy_tasks.py b/tests/dags/test_only_dummy_tasks.py index 8b0f04d8e5c1c..ba96b73151b4d 100644 --- a/tests/dags/test_only_dummy_tasks.py +++ b/tests/dags/test_only_dummy_tasks.py @@ -29,6 +29,16 @@ dag = DAG(dag_id="test_only_dummy_tasks", default_args=default_args, schedule_interval='@once') + +class MyDummyOperator(DummyOperator): + template_fields_renderers = {"body": "json"} + template_fields = ("body",) + + def __init__(self, body, *args, **kwargs): + super().__init__(*args, **kwargs) + self.body = body + + with dag: task_a = DummyOperator(task_id="test_task_a") @@ -36,7 +46,7 @@ task_a >> task_b - task_c = DummyOperator(task_id="test_task_c") + task_c = MyDummyOperator(task_id="test_task_c", body={"hello": "world"}) task_d = DummyOperator(task_id="test_task_on_execute", on_execute_callback=lambda *args, **kwargs: 1) From 67b5daf93d7a791670966cea806429155a303041 Mon Sep 17 00:00:00 2001 From: Kaxil Naik Date: Tue, 1 Dec 2020 20:12:37 +0000 Subject: [PATCH 2/5] fixup! Optimize subclasses of DummyOperator for Scheduling --- airflow/serialization/serialized_objects.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow/serialization/serialized_objects.py b/airflow/serialization/serialized_objects.py index b31ef259ed30d..849ae279553b9 100644 --- a/airflow/serialization/serialized_objects.py +++ b/airflow/serialization/serialized_objects.py @@ -438,7 +438,7 @@ def deserialize_operator(cls, encoded_op: Dict[str, Any]) -> BaseOperator: setattr(op, field, None) # Used to determine if an Operator is inherited from DummyOperator - setattr(op, "_is_dummy", bool(encoded_op.get("_is_dummy"))) + setattr(op, "_is_dummy", bool(encoded_op.get("_is_dummy", False))) return op From 0b47a1b9d251a0ac84a366892701585a831fc892 Mon Sep 17 00:00:00 2001 From: Kaxil Naik Date: Tue, 1 Dec 2020 20:32:03 +0000 Subject: [PATCH 3/5] fixup! fixup! Optimize subclasses of DummyOperator for Scheduling --- airflow/models/baseoperator.py | 5 +++++ airflow/models/dagrun.py | 2 +- airflow/operators/dummy_operator.py | 1 + airflow/serialization/serialized_objects.py | 10 +--------- 4 files changed, 8 insertions(+), 10 deletions(-) diff --git a/airflow/models/baseoperator.py b/airflow/models/baseoperator.py index 6a44ebbe715d1..ed361262303f4 100644 --- a/airflow/models/baseoperator.py +++ b/airflow/models/baseoperator.py @@ -1371,6 +1371,11 @@ def is_smart_sensor_compatible(self): """Return if this operator can use smart service. Default False.""" return False + @property + def inherits_from_dummy_operator(self): + """Used to determine if an Operator is inherited from DummyOperator""" + return getattr(self, '_is_dummy', False) + def chain(*tasks: Union[BaseOperator, Sequence[BaseOperator]]): r""" diff --git a/airflow/models/dagrun.py b/airflow/models/dagrun.py index 2ca8beeaf5344..98af00baa7e3a 100644 --- a/airflow/models/dagrun.py +++ b/airflow/models/dagrun.py @@ -724,7 +724,7 @@ def schedule_tis(self, schedulable_tis: Iterable[TI], session: Session = None) - ti for ti in schedulable_tis if ( - (ti.task.task_type == "DummyOperator" or getattr(ti.task, "_is_dummy", False)) + ti.task.inherits_from_dummy_operator and not ti.task.on_execute_callback and not ti.task.on_success_callback ) diff --git a/airflow/operators/dummy_operator.py b/airflow/operators/dummy_operator.py index 5592fac4188e7..13d27508f4f86 100644 --- a/airflow/operators/dummy_operator.py +++ b/airflow/operators/dummy_operator.py @@ -29,6 +29,7 @@ class DummyOperator(BaseOperator): """ ui_color = '#e8f7e4' + inherits_from_dummy_operator = True @apply_defaults def __init__(self, **kwargs) -> None: diff --git a/airflow/serialization/serialized_objects.py b/airflow/serialization/serialized_objects.py index 849ae279553b9..e9b5cf268819e 100644 --- a/airflow/serialization/serialized_objects.py +++ b/airflow/serialization/serialized_objects.py @@ -31,7 +31,6 @@ from airflow.models.baseoperator import BaseOperator, BaseOperatorLink from airflow.models.connection import Connection from airflow.models.dag import DAG -from airflow.operators.dummy_operator import DummyOperator from airflow.serialization.enums import DagAttributeTypes as DAT, Encoding from airflow.serialization.helpers import serialize_template_field from airflow.serialization.json_schema import Validator, load_dag_schema @@ -353,7 +352,7 @@ def serialize_operator(cls, op: BaseOperator) -> dict: serialize_op['_task_module'] = op.__class__.__module__ # Used to determine if an Operator is inherited from DummyOperator - serialize_op['_is_dummy'] = cls._is_inherited_from_dummy_operator(op) + serialize_op['_is_dummy'] = op.inherits_from_dummy_operator if op.operator_extra_links: serialize_op['_operator_extra_links'] = cls._serialize_operator_extra_links( @@ -540,13 +539,6 @@ def _serialize_operator_extra_links(cls, operator_extra_links: Iterable[BaseOper return serialize_operator_extra_links - @classmethod - def _is_inherited_from_dummy_operator(cls, op: BaseOperator) -> bool: - """Used to determine if an Operator is inherited from DummyOperator""" - if op.task_type == "DummyOperator" or isinstance(op, DummyOperator): - return True - return False - class SerializedDAG(DAG, BaseSerialization): """ From b150a6a60be8a2cffd22069e797a48e30d41967f Mon Sep 17 00:00:00 2001 From: Kaxil Naik Date: Tue, 1 Dec 2020 20:38:46 +0000 Subject: [PATCH 4/5] fixup! fixup! fixup! Optimize subclasses of DummyOperator for Scheduling --- airflow/models/baseoperator.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/airflow/models/baseoperator.py b/airflow/models/baseoperator.py index ed361262303f4..871af8d3a553a 100644 --- a/airflow/models/baseoperator.py +++ b/airflow/models/baseoperator.py @@ -1374,6 +1374,9 @@ def is_smart_sensor_compatible(self): @property def inherits_from_dummy_operator(self): """Used to determine if an Operator is inherited from DummyOperator""" + # This looks like `isinstance(self, DummyOperator) would work, but this also + # needs to cope when `self` is a Serialized instance of a DummyOperator or one + # of its sub-classes (which don't inherit from anything but BaseOperator). return getattr(self, '_is_dummy', False) From 4981963b3cfa30104351c7e17c433bfad73a23a3 Mon Sep 17 00:00:00 2001 From: Kaxil Naik Date: Wed, 2 Dec 2020 10:38:21 +0000 Subject: [PATCH 5/5] fixup! fixup! fixup! fixup! Optimize subclasses of DummyOperator for Scheduling --- tests/serialization/test_dag_serialization.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tests/serialization/test_dag_serialization.py b/tests/serialization/test_dag_serialization.py index f68fd2e37e2f1..97c0647209930 100644 --- a/tests/serialization/test_dag_serialization.py +++ b/tests/serialization/test_dag_serialization.py @@ -86,6 +86,7 @@ "retry_delay": 300.0, "_downstream_task_ids": [], "_inlets": [], + "_is_dummy": False, "_outlets": [], "ui_color": "#f0ede4", "ui_fgcolor": "#000", @@ -112,6 +113,7 @@ "retry_delay": 300.0, "_downstream_task_ids": [], "_inlets": [], + "_is_dummy": False, "_outlets": [], "_operator_extra_links": [{"tests.test_utils.mock_operators.CustomOpLink": {}}], "ui_color": "#fff",