diff --git a/docs/spelling_wordlist.txt b/docs/spelling_wordlist.txt index 6b20d228bad16..982c41100a607 100644 --- a/docs/spelling_wordlist.txt +++ b/docs/spelling_wordlist.txt @@ -1792,6 +1792,7 @@ TaskInstanceKey taskinstancekey taskmeta taskmixin +tasksDuration tasksetmeta tasksState taskTree diff --git a/providers/openlineage/src/airflow/providers/openlineage/plugins/facets.py b/providers/openlineage/src/airflow/providers/openlineage/plugins/facets.py index e35d312b70667..9ebf2f6571b8d 100644 --- a/providers/openlineage/src/airflow/providers/openlineage/plugins/facets.py +++ b/providers/openlineage/src/airflow/providers/openlineage/plugins/facets.py @@ -76,10 +76,12 @@ class AirflowStateRunFacet(RunFacet): Attributes: dagRunState: This indicates the final status of the entire DAG run (e.g., "success", "failed"). tasksState: A dictionary mapping task IDs to their respective states. (e.g., "failed", "skipped"). + tasksDuration: A dictionary mapping task IDs to it's duration in seconds. """ dagRunState: str tasksState: dict[str, str] + tasksDuration: dict[str, float] @define diff --git a/providers/openlineage/src/airflow/providers/openlineage/utils/utils.py b/providers/openlineage/src/airflow/providers/openlineage/utils/utils.py index 2316cd3c13dbc..e5d8f993f0bae 100644 --- a/providers/openlineage/src/airflow/providers/openlineage/utils/utils.py +++ b/providers/openlineage/src/airflow/providers/openlineage/utils/utils.py @@ -734,6 +734,12 @@ def get_airflow_state_run_facet( "airflowState": AirflowStateRunFacet( dagRunState=dag_run_state, tasksState={ti.task_id: ti.state for ti in tis}, + tasksDuration={ + ti.task_id: ti.duration + if ti.duration is not None + else (ti.end_date - ti.start_date).total_seconds() + for ti in tis + }, ) } diff --git a/providers/openlineage/tests/unit/openlineage/plugins/test_adapter.py b/providers/openlineage/tests/unit/openlineage/plugins/test_adapter.py index e79e79489f177..4b9044202addb 100644 --- a/providers/openlineage/tests/unit/openlineage/plugins/test_adapter.py +++ b/providers/openlineage/tests/unit/openlineage/plugins/test_adapter.py @@ -870,23 +870,31 @@ def test_emit_dag_complete_event( dag_run._state = DagRunState.SUCCESS dag_run.end_date = event_time if AIRFLOW_V_3_0_PLUS: - mocked_fetch_tis.return_value = [ - TaskInstance( - task=task_0, run_id=run_id, state=TaskInstanceState.SUCCESS, dag_version_id=mock.MagicMock() - ), - TaskInstance( - task=task_1, run_id=run_id, state=TaskInstanceState.SKIPPED, dag_version_id=mock.MagicMock() - ), - TaskInstance( - task=task_2, run_id=run_id, state=TaskInstanceState.FAILED, dag_version_id=mock.MagicMock() - ), - ] + ti0 = TaskInstance( + task=task_0, run_id=run_id, state=TaskInstanceState.SUCCESS, dag_version_id=mock.MagicMock() + ) + ti1 = TaskInstance( + task=task_1, run_id=run_id, state=TaskInstanceState.SKIPPED, dag_version_id=mock.MagicMock() + ) + + ti2 = TaskInstance( + task=task_2, run_id=run_id, state=TaskInstanceState.FAILED, dag_version_id=mock.MagicMock() + ) else: - mocked_fetch_tis.return_value = [ - TaskInstance(task=task_0, run_id=run_id, state=TaskInstanceState.SUCCESS), - TaskInstance(task=task_1, run_id=run_id, state=TaskInstanceState.SKIPPED), - TaskInstance(task=task_2, run_id=run_id, state=TaskInstanceState.FAILED), - ] + ti0 = TaskInstance(task=task_0, run_id=run_id, state=TaskInstanceState.SUCCESS) + ti1 = TaskInstance(task=task_1, run_id=run_id, state=TaskInstanceState.SKIPPED) + ti2 = TaskInstance(task=task_2, run_id=run_id, state=TaskInstanceState.FAILED) + + ti0.start_date = datetime.datetime(2022, 1, 1, 0, 0, 0) + ti0.end_date = datetime.datetime(2022, 1, 1, 0, 10, 0) + + ti1.start_date = datetime.datetime(2022, 1, 1, 0, 10, 2) + ti1.end_date = datetime.datetime(2022, 1, 1, 0, 13, 7) + + ti2.start_date = datetime.datetime(2022, 1, 1, 0, 13, 8) + ti2.end_date = datetime.datetime(2022, 1, 1, 0, 14, 0) + + mocked_fetch_tis.return_value = [ti0, ti1, ti2] generate_static_uuid.return_value = random_uuid adapter.dag_success( @@ -942,6 +950,11 @@ def test_emit_dag_complete_event( task_1.task_id: TaskInstanceState.SKIPPED, task_2.task_id: TaskInstanceState.FAILED, }, + tasksDuration={ + task_0.task_id: 600.0, + task_1.task_id: 185.0, + task_2.task_id: 52.0, + }, ), "processing_engine": processing_engine_run.ProcessingEngineRunFacet( version=ANY, name="Airflow", openlineageAdapterVersion=ANY @@ -1020,24 +1033,34 @@ def test_emit_dag_failed_event( ) dag_run._state = DagRunState.FAILED dag_run.end_date = event_time + if AIRFLOW_V_3_0_PLUS: - mocked_fetch_tis.return_value = [ - TaskInstance( - task=task_0, run_id=run_id, state=TaskInstanceState.SUCCESS, dag_version_id=mock.MagicMock() - ), - TaskInstance( - task=task_1, run_id=run_id, state=TaskInstanceState.SKIPPED, dag_version_id=mock.MagicMock() - ), - TaskInstance( - task=task_2, run_id=run_id, state=TaskInstanceState.FAILED, dag_version_id=mock.MagicMock() - ), - ] + ti0 = TaskInstance( + task=task_0, run_id=run_id, state=TaskInstanceState.SUCCESS, dag_version_id=mock.MagicMock() + ) + ti1 = TaskInstance( + task=task_1, run_id=run_id, state=TaskInstanceState.SKIPPED, dag_version_id=mock.MagicMock() + ) + + ti2 = TaskInstance( + task=task_2, run_id=run_id, state=TaskInstanceState.FAILED, dag_version_id=mock.MagicMock() + ) else: - mocked_fetch_tis.return_value = [ - TaskInstance(task=task_0, run_id=run_id, state=TaskInstanceState.SUCCESS), - TaskInstance(task=task_1, run_id=run_id, state=TaskInstanceState.SKIPPED), - TaskInstance(task=task_2, run_id=run_id, state=TaskInstanceState.FAILED), - ] + ti0 = TaskInstance(task=task_0, run_id=run_id, state=TaskInstanceState.SUCCESS) + ti1 = TaskInstance(task=task_1, run_id=run_id, state=TaskInstanceState.SKIPPED) + ti2 = TaskInstance(task=task_2, run_id=run_id, state=TaskInstanceState.FAILED) + + ti0.start_date = datetime.datetime(2022, 1, 1, 0, 0, 0) + ti0.end_date = datetime.datetime(2022, 1, 1, 0, 10, 0) + + ti1.start_date = datetime.datetime(2022, 1, 1, 0, 10, 2) + ti1.end_date = datetime.datetime(2022, 1, 1, 0, 13, 7) + + ti2.start_date = datetime.datetime(2022, 1, 1, 0, 13, 8) + ti2.end_date = datetime.datetime(2022, 1, 1, 0, 14, 0) + + mocked_fetch_tis.return_value = [ti0, ti1, ti2] + generate_static_uuid.return_value = random_uuid adapter.dag_failed( @@ -1097,6 +1120,11 @@ def test_emit_dag_failed_event( task_1.task_id: TaskInstanceState.SKIPPED, task_2.task_id: TaskInstanceState.FAILED, }, + tasksDuration={ + task_0.task_id: 600.0, + task_1.task_id: 185.0, + task_2.task_id: 52.0, + }, ), "processing_engine": processing_engine_run.ProcessingEngineRunFacet( version=ANY, name="Airflow", openlineageAdapterVersion=ANY diff --git a/providers/openlineage/tests/unit/openlineage/plugins/test_facets.py b/providers/openlineage/tests/unit/openlineage/plugins/test_facets.py index d46cadc9d69c6..1accbde982f2b 100644 --- a/providers/openlineage/tests/unit/openlineage/plugins/test_facets.py +++ b/providers/openlineage/tests/unit/openlineage/plugins/test_facets.py @@ -95,7 +95,11 @@ def test_airflow_dag_run_facet(): id="AirflowJobFacet", ), pytest.param( - AirflowStateRunFacet(dagRunState="SUCCESS", tasksState={"task_0": "SKIPPED"}), + AirflowStateRunFacet( + dagRunState="SUCCESS", + tasksState={"task_0": "SKIPPED"}, + tasksDuration={"task_0": 123}, + ), id="AirflowStateRunFacet", ), pytest.param( diff --git a/providers/openlineage/tests/unit/openlineage/plugins/test_listener.py b/providers/openlineage/tests/unit/openlineage/plugins/test_listener.py index 2f171a6b44bc4..96d2558cc0439 100644 --- a/providers/openlineage/tests/unit/openlineage/plugins/test_listener.py +++ b/providers/openlineage/tests/unit/openlineage/plugins/test_listener.py @@ -21,6 +21,7 @@ from collections.abc import Callable from concurrent.futures import Future from contextlib import suppress +from datetime import datetime from typing import TYPE_CHECKING from unittest import mock from unittest.mock import MagicMock, patch @@ -876,6 +877,8 @@ def test_listener_on_dag_run_state_changes_configure_process_pool_size( def test_listener_on_dag_run_state_changes(self, mock_emit, method, dag_run_state, create_task_instance): mock_executor = MockExecutor() ti = create_task_instance(dag_id="dag", task_id="op") + ti.start_date = datetime(2020, 1, 1, tzinfo=timezone.utc) + ti.end_date = datetime(2020, 1, 1, 1, tzinfo=timezone.utc) # Change the state explicitly to set end_date following the logic in the method ti.dag_run.set_state(dag_run_state) with mock.patch( @@ -1799,6 +1802,8 @@ def test_listener_on_dag_run_state_changes_configure_process_pool_size( def test_listener_on_dag_run_state_changes(self, mock_emit, method, dag_run_state, create_task_instance): mock_executor = MockExecutor() ti = create_task_instance(dag_id="dag", task_id="op") + ti.start_date = datetime(2020, 1, 1, tzinfo=timezone.utc) + ti.end_date = datetime(2020, 1, 1, 1, tzinfo=timezone.utc) # Change the state explicitly to set end_date following the logic in the method ti.dag_run.set_state(dag_run_state) with mock.patch(