diff --git a/providers/openlineage/src/airflow/providers/openlineage/utils/utils.py b/providers/openlineage/src/airflow/providers/openlineage/utils/utils.py index 8ef5c23ced88c..bd3901e84e76c 100644 --- a/providers/openlineage/src/airflow/providers/openlineage/utils/utils.py +++ b/providers/openlineage/src/airflow/providers/openlineage/utils/utils.py @@ -33,7 +33,7 @@ from airflow import __version__ as AIRFLOW_VERSION # TODO: move this maybe to Airflow's logic? -from airflow.models import DagRun, TaskReschedule +from airflow.models import DagRun, TaskInstance, TaskReschedule from airflow.models.mappedoperator import MappedOperator as SerializedMappedOperator from airflow.providers.common.compat.assets import Asset from airflow.providers.common.compat.sdk import DAG, BaseOperator, BaseSensorOperator, MappedOperator @@ -68,7 +68,6 @@ from openlineage.client.event_v2 import Dataset as OpenLineageDataset from openlineage.client.facet_v2 import RunFacet, processing_engine_run - from airflow.models import TaskInstance from airflow.sdk.execution_time.secrets_masker import ( Redactable, Redacted, @@ -782,25 +781,32 @@ def _get_task_groups_details(dag: DAG | SerializedDAG) -> dict: def _emits_ol_events(task: AnyOperator) -> bool: config_selective_enabled = is_selective_lineage_enabled(task) config_disabled_for_operators = is_operator_disabled(task) - # empty operators without callbacks/outlets are skipped for optimization by Airflow - # in airflow.models.taskinstance.TaskInstance._schedule_downstream_tasks - is_skipped_as_empty_operator = all( - ( - task.inherits_from_empty_operator, - not getattr(task, "on_execute_callback", None), - not getattr(task, "on_success_callback", None), - not task.outlets, - not (task.inlets and get_base_airflow_version_tuple() >= (3, 0, 2)), # Added in 3.0.2 #50773 - not ( - getattr(task, "has_on_execute_callback", None) # Added in 3.1.0 #54569 - and get_base_airflow_version_tuple() >= (3, 1, 0) - ), - not ( - getattr(task, "has_on_success_callback", None) # Added in 3.1.0 #54569 - and get_base_airflow_version_tuple() >= (3, 1, 0) - ), + + is_task_schedulable_method = getattr(TaskInstance, "is_task_schedulable", None) # Added in 3.2.0 #56039 + if is_task_schedulable_method and callable(is_task_schedulable_method): + is_skipped_as_empty_operator = not is_task_schedulable_method(task) + else: + # For older Airflow versions, re-create Airflow core internal logic as + # empty operators without callbacks/outlets are skipped for optimization by Airflow + # in airflow.models.taskinstance.TaskInstance._schedule_downstream_tasks or + # airflow.models.dagrun.DagRun.schedule_tis, depending on Airflow version + is_skipped_as_empty_operator = all( + ( + task.inherits_from_empty_operator, + not getattr(task, "on_execute_callback", None), + not getattr(task, "on_success_callback", None), + not task.outlets, + not (task.inlets and get_base_airflow_version_tuple() >= (3, 0, 2)), # Added in 3.0.2 #50773 + not ( + getattr(task, "has_on_execute_callback", None) # Added in 3.1.0 #54569 + and get_base_airflow_version_tuple() >= (3, 1, 0) + ), + not ( + getattr(task, "has_on_success_callback", None) # Added in 3.1.0 #54569 + and get_base_airflow_version_tuple() >= (3, 1, 0) + ), + ) ) - ) emits_ol_events = all( (