Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions airflow/models/baseoperator.py
Original file line number Diff line number Diff line change
Expand Up @@ -1371,6 +1371,14 @@ def is_smart_sensor_compatible(self):
"""Return if this operator can use smart service. Default False."""
return False

@property
def inherits_from_dummy_operator(self):
"""Used to determine if an Operator is inherited from DummyOperator"""
# This looks like `isinstance(self, DummyOperator) would work, but this also
# needs to cope when `self` is a Serialized instance of a DummyOperator or one
# of its sub-classes (which don't inherit from anything but BaseOperator).
return getattr(self, '_is_dummy', False)


def chain(*tasks: Union[BaseOperator, Sequence[BaseOperator]]):
r"""
Expand Down
2 changes: 1 addition & 1 deletion airflow/models/dagrun.py
Original file line number Diff line number Diff line change
Expand Up @@ -724,7 +724,7 @@ def schedule_tis(self, schedulable_tis: Iterable[TI], session: Session = None) -
ti
for ti in schedulable_tis
if (
ti.task.task_type == "DummyOperator"
ti.task.inherits_from_dummy_operator
and not ti.task.on_execute_callback
and not ti.task.on_success_callback
)
Expand Down
1 change: 1 addition & 0 deletions airflow/operators/dummy_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ class DummyOperator(BaseOperator):
"""

ui_color = '#e8f7e4'
inherits_from_dummy_operator = True

@apply_defaults
def __init__(self, **kwargs) -> None:
Expand Down
3 changes: 2 additions & 1 deletion airflow/serialization/schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,8 @@
"_downstream_task_ids": {
"type": "array",
"items": { "type": "string" }
}
},
"_is_dummy": { "type": "boolean" }
},
"additionalProperties": true
},
Expand Down
7 changes: 7 additions & 0 deletions airflow/serialization/serialized_objects.py
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,10 @@ def serialize_operator(cls, op: BaseOperator) -> dict:
serialize_op = cls.serialize_to_json(op, cls._decorated_fields)
serialize_op['_task_type'] = op.__class__.__name__
serialize_op['_task_module'] = op.__class__.__module__

# Used to determine if an Operator is inherited from DummyOperator
serialize_op['_is_dummy'] = op.inherits_from_dummy_operator

if op.operator_extra_links:
serialize_op['_operator_extra_links'] = cls._serialize_operator_extra_links(
op.operator_extra_links
Expand Down Expand Up @@ -432,6 +436,9 @@ def deserialize_operator(cls, encoded_op: Dict[str, Any]) -> BaseOperator:
if not hasattr(op, field):
setattr(op, field, None)

# Used to determine if an Operator is inherited from DummyOperator
setattr(op, "_is_dummy", bool(encoded_op.get("_is_dummy", False)))

return op

@classmethod
Expand Down
12 changes: 11 additions & 1 deletion tests/dags/test_only_dummy_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,24 @@

dag = DAG(dag_id="test_only_dummy_tasks", default_args=default_args, schedule_interval='@once')


Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This DAG is used by test_should_mark_dummy_task_as_success test in tests/jobs/test_scheduler_job.py

class MyDummyOperator(DummyOperator):
template_fields_renderers = {"body": "json"}
template_fields = ("body",)

def __init__(self, body, *args, **kwargs):
super().__init__(*args, **kwargs)
self.body = body


with dag:
task_a = DummyOperator(task_id="test_task_a")

task_b = DummyOperator(task_id="test_task_b")

task_a >> task_b

task_c = DummyOperator(task_id="test_task_c")
task_c = MyDummyOperator(task_id="test_task_c", body={"hello": "world"})

task_d = DummyOperator(task_id="test_task_on_execute", on_execute_callback=lambda *args, **kwargs: 1)

Expand Down
2 changes: 2 additions & 0 deletions tests/serialization/test_dag_serialization.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@
"retry_delay": 300.0,
"_downstream_task_ids": [],
"_inlets": [],
"_is_dummy": False,
"_outlets": [],
"ui_color": "#f0ede4",
"ui_fgcolor": "#000",
Expand All @@ -112,6 +113,7 @@
"retry_delay": 300.0,
"_downstream_task_ids": [],
"_inlets": [],
"_is_dummy": False,
"_outlets": [],
"_operator_extra_links": [{"tests.test_utils.mock_operators.CustomOpLink": {}}],
"ui_color": "#fff",
Expand Down