diff --git a/providers/openlineage/src/airflow/providers/openlineage/utils/utils.py b/providers/openlineage/src/airflow/providers/openlineage/utils/utils.py index d08868961fe5c..34dea63925efc 100644 --- a/providers/openlineage/src/airflow/providers/openlineage/utils/utils.py +++ b/providers/openlineage/src/airflow/providers/openlineage/utils/utils.py @@ -464,14 +464,25 @@ class DagInfo(InfoJsonEncodable): "fileloc", "owner", "owner_links", - "schedule_interval", # For Airflow 2. - "timetable_summary", # For Airflow 3. + "schedule_interval", # For Airflow 2 only -> AF3 has timetable_summary "start_date", "tags", ] - casts = {"timetable": lambda dag: DagInfo.serialize_timetable(dag)} + casts = { + "timetable": lambda dag: DagInfo.serialize_timetable(dag), + "timetable_summary": lambda dag: DagInfo.timetable_summary(dag), + } renames = {"_dag_id": "dag_id"} + @classmethod + def timetable_summary(cls, dag: DAG) -> str | None: + """Extract summary from timetable if missing a ``timetable_summary`` property.""" + if getattr(dag, "timetable_summary", None): + return dag.timetable_summary + if getattr(dag, "timetable", None): + return dag.timetable.summary + return None + @classmethod def serialize_timetable(cls, dag: DAG) -> dict[str, Any]: # This is enough for Airflow 2.10+ and has all the information needed @@ -797,6 +808,14 @@ def _emits_ol_events(task: AnyOperator) -> bool: not getattr(task, "on_success_callback", None), not task.outlets, not (task.inlets and get_base_airflow_version_tuple() >= (3, 0, 2)), # Added in 3.0.2 #50773 + not ( + getattr(task, "has_on_execute_callback", None) # Added in 3.1.0 #54569 + and get_base_airflow_version_tuple() >= (3, 1, 0) + ), + not ( + getattr(task, "has_on_success_callback", None) # Added in 3.1.0 #54569 + and get_base_airflow_version_tuple() >= (3, 1, 0) + ), ) ) diff --git a/providers/openlineage/tests/unit/openlineage/utils/test_utils.py b/providers/openlineage/tests/unit/openlineage/utils/test_utils.py index d8ac2ade76ac2..38f816e2f6188 100644 --- a/providers/openlineage/tests/unit/openlineage/utils/test_utils.py +++ b/providers/openlineage/tests/unit/openlineage/utils/test_utils.py @@ -182,14 +182,13 @@ def test_get_airflow_dag_run_facet(): "fileloc": pathlib.Path(__file__).resolve().as_posix(), "owner": "airflow", "timetable": {}, + "timetable_summary": "@once", "start_date": "2024-06-01T00:00:00+00:00", "tags": "['test']", "owner_links": {}, } if hasattr(dag, "schedule_interval"): # Airflow 2 compat. expected_dag_info["schedule_interval"] = "@once" - else: # Airflow 3 and up. - expected_dag_info["timetable_summary"] = "@once" assert result == { "airflowDagRun": AirflowDagRunFacet( dag=expected_dag_info, @@ -1055,7 +1054,21 @@ def test_get_user_provided_run_facets_with_exception(mock_custom_facet_funcs): assert result == {} -@pytest.mark.skipif(AIRFLOW_V_3_0_PLUS, reason="Airflow 2.9+ tests") +def test_daginfo_timetable_summary(): + from airflow.timetables.simple import NullTimetable + + dag = MagicMock() + # timetable is enough to get summary + dag.timetable = NullTimetable() + dag.timetable_summary = None + assert DagInfo(dag).timetable_summary == "None" + + # but if summary is present, it's preferred + dag.timetable_summary = "explicit_summary" + assert DagInfo(dag).timetable_summary == "explicit_summary" + + +@pytest.mark.skipif(AIRFLOW_V_3_0_PLUS, reason="Airflow 2 tests") class TestDagInfoAirflow2: def test_dag_info(self): with DAG( @@ -1080,6 +1093,7 @@ def test_dag_info(self): "start_date": "2024-06-01T00:00:00+00:00", "tags": "['test']", "timetable": {}, + "timetable_summary": "@once", "owner_links": {"some_owner": "https://airflow.apache.org"}, } @@ -1100,6 +1114,7 @@ def test_dag_info_schedule_cron(self): "start_date": "2024-06-01T00:00:00+00:00", "tags": "[]", "timetable": {"expression": "*/4 3 * * *", "timezone": "UTC"}, + "timetable_summary": "*/4 3 * * *", "owner_links": {}, } @@ -1124,6 +1139,7 @@ def test_dag_info_schedule_events_timetable(self): "fileloc": pathlib.Path(__file__).resolve().as_posix(), "owner": "", "schedule_interval": "My Team's Baseball Games", + "timetable_summary": "My Team's Baseball Games", "start_date": "2024-06-01T00:00:00+00:00", "tags": "[]", "owner_links": {}, @@ -1151,6 +1167,7 @@ def test_dag_info_schedule_list_single_dataset(self): "fileloc": pathlib.Path(__file__).resolve().as_posix(), "owner": "", "schedule_interval": "Dataset", + "timetable_summary": "Dataset", "start_date": "2024-06-01T00:00:00+00:00", "tags": "[]", "owner_links": {}, @@ -1176,6 +1193,7 @@ def test_dag_info_schedule_list_two_datasets(self): "fileloc": pathlib.Path(__file__).resolve().as_posix(), "owner": "", "schedule_interval": "Dataset", + "timetable_summary": "Dataset", "start_date": "2024-06-01T00:00:00+00:00", "tags": "[]", "owner_links": {}, @@ -1204,6 +1222,7 @@ def test_dag_info_schedule_datasets_logical_condition(self): "fileloc": pathlib.Path(__file__).resolve().as_posix(), "owner": "", "schedule_interval": "Dataset", + "timetable_summary": "Dataset", "start_date": "2024-06-01T00:00:00+00:00", "tags": "[]", "owner_links": {}, @@ -1250,6 +1269,7 @@ def test_dag_info_schedule_dataset_or_time_schedule(self): "fileloc": pathlib.Path(__file__).resolve().as_posix(), "owner": "", "schedule_interval": "Dataset or */4 3 * * *", + "timetable_summary": "Dataset or */4 3 * * *", "start_date": "2024-06-01T00:00:00+00:00", "tags": "[]", "owner_links": {}, @@ -1301,6 +1321,7 @@ def test_dag_info_schedule_single_dataset_directly(self): "owner_links": {}, "timetable": {"dataset_condition": {"__type": "dataset", "uri": "uri1", "extra": {"a": 1}}}, "schedule_interval": "Dataset", + "timetable_summary": "Dataset", }