Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/spelling_wordlist.txt
Original file line number Diff line number Diff line change
Expand Up @@ -1792,6 +1792,7 @@ TaskInstanceKey
taskinstancekey
taskmeta
taskmixin
tasksDuration
tasksetmeta
tasksState
taskTree
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,10 +76,12 @@ class AirflowStateRunFacet(RunFacet):
Attributes:
dagRunState: This indicates the final status of the entire DAG run (e.g., "success", "failed").
tasksState: A dictionary mapping task IDs to their respective states. (e.g., "failed", "skipped").
tasksDuration: A dictionary mapping task IDs to it's duration in seconds.
"""

dagRunState: str
tasksState: dict[str, str]
tasksDuration: dict[str, float]


@define
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -734,6 +734,12 @@ def get_airflow_state_run_facet(
"airflowState": AirflowStateRunFacet(
dagRunState=dag_run_state,
tasksState={ti.task_id: ti.state for ti in tis},
tasksDuration={
ti.task_id: ti.duration
if ti.duration is not None
else (ti.end_date - ti.start_date).total_seconds()
for ti in tis
},
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -870,23 +870,31 @@ def test_emit_dag_complete_event(
dag_run._state = DagRunState.SUCCESS
dag_run.end_date = event_time
if AIRFLOW_V_3_0_PLUS:
mocked_fetch_tis.return_value = [
TaskInstance(
task=task_0, run_id=run_id, state=TaskInstanceState.SUCCESS, dag_version_id=mock.MagicMock()
),
TaskInstance(
task=task_1, run_id=run_id, state=TaskInstanceState.SKIPPED, dag_version_id=mock.MagicMock()
),
TaskInstance(
task=task_2, run_id=run_id, state=TaskInstanceState.FAILED, dag_version_id=mock.MagicMock()
),
]
ti0 = TaskInstance(
task=task_0, run_id=run_id, state=TaskInstanceState.SUCCESS, dag_version_id=mock.MagicMock()
)
ti1 = TaskInstance(
task=task_1, run_id=run_id, state=TaskInstanceState.SKIPPED, dag_version_id=mock.MagicMock()
)

ti2 = TaskInstance(
task=task_2, run_id=run_id, state=TaskInstanceState.FAILED, dag_version_id=mock.MagicMock()
)
else:
mocked_fetch_tis.return_value = [
TaskInstance(task=task_0, run_id=run_id, state=TaskInstanceState.SUCCESS),
TaskInstance(task=task_1, run_id=run_id, state=TaskInstanceState.SKIPPED),
TaskInstance(task=task_2, run_id=run_id, state=TaskInstanceState.FAILED),
]
ti0 = TaskInstance(task=task_0, run_id=run_id, state=TaskInstanceState.SUCCESS)
ti1 = TaskInstance(task=task_1, run_id=run_id, state=TaskInstanceState.SKIPPED)
ti2 = TaskInstance(task=task_2, run_id=run_id, state=TaskInstanceState.FAILED)

ti0.start_date = datetime.datetime(2022, 1, 1, 0, 0, 0)
ti0.end_date = datetime.datetime(2022, 1, 1, 0, 10, 0)

ti1.start_date = datetime.datetime(2022, 1, 1, 0, 10, 2)
ti1.end_date = datetime.datetime(2022, 1, 1, 0, 13, 7)

ti2.start_date = datetime.datetime(2022, 1, 1, 0, 13, 8)
ti2.end_date = datetime.datetime(2022, 1, 1, 0, 14, 0)

mocked_fetch_tis.return_value = [ti0, ti1, ti2]
generate_static_uuid.return_value = random_uuid

adapter.dag_success(
Expand Down Expand Up @@ -942,6 +950,11 @@ def test_emit_dag_complete_event(
task_1.task_id: TaskInstanceState.SKIPPED,
task_2.task_id: TaskInstanceState.FAILED,
},
tasksDuration={
task_0.task_id: 600.0,
task_1.task_id: 185.0,
task_2.task_id: 52.0,
},
),
"processing_engine": processing_engine_run.ProcessingEngineRunFacet(
version=ANY, name="Airflow", openlineageAdapterVersion=ANY
Expand Down Expand Up @@ -1020,24 +1033,34 @@ def test_emit_dag_failed_event(
)
dag_run._state = DagRunState.FAILED
dag_run.end_date = event_time

if AIRFLOW_V_3_0_PLUS:
mocked_fetch_tis.return_value = [
TaskInstance(
task=task_0, run_id=run_id, state=TaskInstanceState.SUCCESS, dag_version_id=mock.MagicMock()
),
TaskInstance(
task=task_1, run_id=run_id, state=TaskInstanceState.SKIPPED, dag_version_id=mock.MagicMock()
),
TaskInstance(
task=task_2, run_id=run_id, state=TaskInstanceState.FAILED, dag_version_id=mock.MagicMock()
),
]
ti0 = TaskInstance(
task=task_0, run_id=run_id, state=TaskInstanceState.SUCCESS, dag_version_id=mock.MagicMock()
)
ti1 = TaskInstance(
task=task_1, run_id=run_id, state=TaskInstanceState.SKIPPED, dag_version_id=mock.MagicMock()
)

ti2 = TaskInstance(
task=task_2, run_id=run_id, state=TaskInstanceState.FAILED, dag_version_id=mock.MagicMock()
)
else:
mocked_fetch_tis.return_value = [
TaskInstance(task=task_0, run_id=run_id, state=TaskInstanceState.SUCCESS),
TaskInstance(task=task_1, run_id=run_id, state=TaskInstanceState.SKIPPED),
TaskInstance(task=task_2, run_id=run_id, state=TaskInstanceState.FAILED),
]
ti0 = TaskInstance(task=task_0, run_id=run_id, state=TaskInstanceState.SUCCESS)
ti1 = TaskInstance(task=task_1, run_id=run_id, state=TaskInstanceState.SKIPPED)
ti2 = TaskInstance(task=task_2, run_id=run_id, state=TaskInstanceState.FAILED)

ti0.start_date = datetime.datetime(2022, 1, 1, 0, 0, 0)
ti0.end_date = datetime.datetime(2022, 1, 1, 0, 10, 0)

ti1.start_date = datetime.datetime(2022, 1, 1, 0, 10, 2)
ti1.end_date = datetime.datetime(2022, 1, 1, 0, 13, 7)

ti2.start_date = datetime.datetime(2022, 1, 1, 0, 13, 8)
ti2.end_date = datetime.datetime(2022, 1, 1, 0, 14, 0)

mocked_fetch_tis.return_value = [ti0, ti1, ti2]

generate_static_uuid.return_value = random_uuid

adapter.dag_failed(
Expand Down Expand Up @@ -1097,6 +1120,11 @@ def test_emit_dag_failed_event(
task_1.task_id: TaskInstanceState.SKIPPED,
task_2.task_id: TaskInstanceState.FAILED,
},
tasksDuration={
task_0.task_id: 600.0,
task_1.task_id: 185.0,
task_2.task_id: 52.0,
},
),
"processing_engine": processing_engine_run.ProcessingEngineRunFacet(
version=ANY, name="Airflow", openlineageAdapterVersion=ANY
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,11 @@ def test_airflow_dag_run_facet():
id="AirflowJobFacet",
),
pytest.param(
AirflowStateRunFacet(dagRunState="SUCCESS", tasksState={"task_0": "SKIPPED"}),
AirflowStateRunFacet(
dagRunState="SUCCESS",
tasksState={"task_0": "SKIPPED"},
tasksDuration={"task_0": 123},
),
id="AirflowStateRunFacet",
),
pytest.param(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from collections.abc import Callable
from concurrent.futures import Future
from contextlib import suppress
from datetime import datetime
from typing import TYPE_CHECKING
from unittest import mock
from unittest.mock import MagicMock, patch
Expand Down Expand Up @@ -876,6 +877,8 @@ def test_listener_on_dag_run_state_changes_configure_process_pool_size(
def test_listener_on_dag_run_state_changes(self, mock_emit, method, dag_run_state, create_task_instance):
mock_executor = MockExecutor()
ti = create_task_instance(dag_id="dag", task_id="op")
ti.start_date = datetime(2020, 1, 1, tzinfo=timezone.utc)
ti.end_date = datetime(2020, 1, 1, 1, tzinfo=timezone.utc)
# Change the state explicitly to set end_date following the logic in the method
ti.dag_run.set_state(dag_run_state)
with mock.patch(
Expand Down Expand Up @@ -1799,6 +1802,8 @@ def test_listener_on_dag_run_state_changes_configure_process_pool_size(
def test_listener_on_dag_run_state_changes(self, mock_emit, method, dag_run_state, create_task_instance):
mock_executor = MockExecutor()
ti = create_task_instance(dag_id="dag", task_id="op")
ti.start_date = datetime(2020, 1, 1, tzinfo=timezone.utc)
ti.end_date = datetime(2020, 1, 1, 1, tzinfo=timezone.utc)
# Change the state explicitly to set end_date following the logic in the method
ti.dag_run.set_state(dag_run_state)
with mock.patch(
Expand Down