Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
from airflow import __version__ as AIRFLOW_VERSION

# TODO: move this maybe to Airflow's logic?
from airflow.models import DagRun, TaskReschedule
from airflow.models import DagRun, TaskInstance, TaskReschedule
from airflow.models.mappedoperator import MappedOperator as SerializedMappedOperator
from airflow.providers.common.compat.assets import Asset
from airflow.providers.common.compat.sdk import DAG, BaseOperator, BaseSensorOperator, MappedOperator
Expand Down Expand Up @@ -68,7 +68,6 @@
from openlineage.client.event_v2 import Dataset as OpenLineageDataset
from openlineage.client.facet_v2 import RunFacet, processing_engine_run

from airflow.models import TaskInstance
from airflow.sdk.execution_time.secrets_masker import (
Redactable,
Redacted,
Expand Down Expand Up @@ -782,25 +781,32 @@ def _get_task_groups_details(dag: DAG | SerializedDAG) -> dict:
def _emits_ol_events(task: AnyOperator) -> bool:
config_selective_enabled = is_selective_lineage_enabled(task)
config_disabled_for_operators = is_operator_disabled(task)
# empty operators without callbacks/outlets are skipped for optimization by Airflow
# in airflow.models.taskinstance.TaskInstance._schedule_downstream_tasks
is_skipped_as_empty_operator = all(
(
task.inherits_from_empty_operator,
not getattr(task, "on_execute_callback", None),
not getattr(task, "on_success_callback", None),
not task.outlets,
not (task.inlets and get_base_airflow_version_tuple() >= (3, 0, 2)), # Added in 3.0.2 #50773
not (
getattr(task, "has_on_execute_callback", None) # Added in 3.1.0 #54569
and get_base_airflow_version_tuple() >= (3, 1, 0)
),
not (
getattr(task, "has_on_success_callback", None) # Added in 3.1.0 #54569
and get_base_airflow_version_tuple() >= (3, 1, 0)
),

is_task_schedulable_method = getattr(TaskInstance, "is_task_schedulable", None) # Added in 3.2.0 #56039
if is_task_schedulable_method and callable(is_task_schedulable_method):
is_skipped_as_empty_operator = not is_task_schedulable_method(task)
else:
# For older Airflow versions, re-create Airflow core internal logic as
# empty operators without callbacks/outlets are skipped for optimization by Airflow
# in airflow.models.taskinstance.TaskInstance._schedule_downstream_tasks or
# airflow.models.dagrun.DagRun.schedule_tis, depending on Airflow version
is_skipped_as_empty_operator = all(
(
task.inherits_from_empty_operator,
not getattr(task, "on_execute_callback", None),
not getattr(task, "on_success_callback", None),
not task.outlets,
not (task.inlets and get_base_airflow_version_tuple() >= (3, 0, 2)), # Added in 3.0.2 #50773
not (
getattr(task, "has_on_execute_callback", None) # Added in 3.1.0 #54569
and get_base_airflow_version_tuple() >= (3, 1, 0)
),
not (
getattr(task, "has_on_success_callback", None) # Added in 3.1.0 #54569
and get_base_airflow_version_tuple() >= (3, 1, 0)
),
)
)
)

emits_ol_events = all(
(
Expand Down