Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -464,14 +464,25 @@ class DagInfo(InfoJsonEncodable):
"fileloc",
"owner",
"owner_links",
"schedule_interval", # For Airflow 2.
"timetable_summary", # For Airflow 3.
"schedule_interval", # For Airflow 2 only -> AF3 has timetable_summary
"start_date",
"tags",
]
casts = {"timetable": lambda dag: DagInfo.serialize_timetable(dag)}
casts = {
"timetable": lambda dag: DagInfo.serialize_timetable(dag),
"timetable_summary": lambda dag: DagInfo.timetable_summary(dag),
}
renames = {"_dag_id": "dag_id"}

@classmethod
def timetable_summary(cls, dag: DAG) -> str | None:
"""Extract summary from timetable if missing a ``timetable_summary`` property."""
if getattr(dag, "timetable_summary", None):
return dag.timetable_summary
if getattr(dag, "timetable", None):
return dag.timetable.summary
return None

@classmethod
def serialize_timetable(cls, dag: DAG) -> dict[str, Any]:
# This is enough for Airflow 2.10+ and has all the information needed
Expand Down Expand Up @@ -797,6 +808,14 @@ def _emits_ol_events(task: AnyOperator) -> bool:
not getattr(task, "on_success_callback", None),
not task.outlets,
not (task.inlets and get_base_airflow_version_tuple() >= (3, 0, 2)), # Added in 3.0.2 #50773
not (
getattr(task, "has_on_execute_callback", None) # Added in 3.1.0 #54569
and get_base_airflow_version_tuple() >= (3, 1, 0)
),
not (
getattr(task, "has_on_success_callback", None) # Added in 3.1.0 #54569
and get_base_airflow_version_tuple() >= (3, 1, 0)
),
)
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,14 +182,13 @@ def test_get_airflow_dag_run_facet():
"fileloc": pathlib.Path(__file__).resolve().as_posix(),
"owner": "airflow",
"timetable": {},
"timetable_summary": "@once",
"start_date": "2024-06-01T00:00:00+00:00",
"tags": "['test']",
"owner_links": {},
}
if hasattr(dag, "schedule_interval"): # Airflow 2 compat.
expected_dag_info["schedule_interval"] = "@once"
else: # Airflow 3 and up.
expected_dag_info["timetable_summary"] = "@once"
assert result == {
"airflowDagRun": AirflowDagRunFacet(
dag=expected_dag_info,
Expand Down Expand Up @@ -1055,7 +1054,21 @@ def test_get_user_provided_run_facets_with_exception(mock_custom_facet_funcs):
assert result == {}


@pytest.mark.skipif(AIRFLOW_V_3_0_PLUS, reason="Airflow 2.9+ tests")
def test_daginfo_timetable_summary():
from airflow.timetables.simple import NullTimetable

dag = MagicMock()
# timetable is enough to get summary
dag.timetable = NullTimetable()
dag.timetable_summary = None
assert DagInfo(dag).timetable_summary == "None"

# but if summary is present, it's preferred
dag.timetable_summary = "explicit_summary"
assert DagInfo(dag).timetable_summary == "explicit_summary"


@pytest.mark.skipif(AIRFLOW_V_3_0_PLUS, reason="Airflow 2 tests")
class TestDagInfoAirflow2:
def test_dag_info(self):
with DAG(
Expand All @@ -1080,6 +1093,7 @@ def test_dag_info(self):
"start_date": "2024-06-01T00:00:00+00:00",
"tags": "['test']",
"timetable": {},
"timetable_summary": "@once",
"owner_links": {"some_owner": "https://airflow.apache.org"},
}

Expand All @@ -1100,6 +1114,7 @@ def test_dag_info_schedule_cron(self):
"start_date": "2024-06-01T00:00:00+00:00",
"tags": "[]",
"timetable": {"expression": "*/4 3 * * *", "timezone": "UTC"},
"timetable_summary": "*/4 3 * * *",
"owner_links": {},
}

Expand All @@ -1124,6 +1139,7 @@ def test_dag_info_schedule_events_timetable(self):
"fileloc": pathlib.Path(__file__).resolve().as_posix(),
"owner": "",
"schedule_interval": "My Team's Baseball Games",
"timetable_summary": "My Team's Baseball Games",
"start_date": "2024-06-01T00:00:00+00:00",
"tags": "[]",
"owner_links": {},
Expand Down Expand Up @@ -1151,6 +1167,7 @@ def test_dag_info_schedule_list_single_dataset(self):
"fileloc": pathlib.Path(__file__).resolve().as_posix(),
"owner": "",
"schedule_interval": "Dataset",
"timetable_summary": "Dataset",
"start_date": "2024-06-01T00:00:00+00:00",
"tags": "[]",
"owner_links": {},
Expand All @@ -1176,6 +1193,7 @@ def test_dag_info_schedule_list_two_datasets(self):
"fileloc": pathlib.Path(__file__).resolve().as_posix(),
"owner": "",
"schedule_interval": "Dataset",
"timetable_summary": "Dataset",
"start_date": "2024-06-01T00:00:00+00:00",
"tags": "[]",
"owner_links": {},
Expand Down Expand Up @@ -1204,6 +1222,7 @@ def test_dag_info_schedule_datasets_logical_condition(self):
"fileloc": pathlib.Path(__file__).resolve().as_posix(),
"owner": "",
"schedule_interval": "Dataset",
"timetable_summary": "Dataset",
"start_date": "2024-06-01T00:00:00+00:00",
"tags": "[]",
"owner_links": {},
Expand Down Expand Up @@ -1250,6 +1269,7 @@ def test_dag_info_schedule_dataset_or_time_schedule(self):
"fileloc": pathlib.Path(__file__).resolve().as_posix(),
"owner": "",
"schedule_interval": "Dataset or */4 3 * * *",
"timetable_summary": "Dataset or */4 3 * * *",
"start_date": "2024-06-01T00:00:00+00:00",
"tags": "[]",
"owner_links": {},
Expand Down Expand Up @@ -1301,6 +1321,7 @@ def test_dag_info_schedule_single_dataset_directly(self):
"owner_links": {},
"timetable": {"dataset_condition": {"__type": "dataset", "uri": "uri1", "extra": {"a": 1}}},
"schedule_interval": "Dataset",
"timetable_summary": "Dataset",
}


Expand Down