diff --git a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/dag_run.py b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/dag_run.py index 367547a8d8e09..dd04f493c9784 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/dag_run.py +++ b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/dag_run.py @@ -66,6 +66,7 @@ class DAGRunResponse(BaseModel): queued_at: datetime | None start_date: datetime | None end_date: datetime | None + duration: float | None data_interval_start: datetime | None data_interval_end: datetime | None run_after: datetime diff --git a/airflow-core/src/airflow/api_fastapi/core_api/openapi/_private_ui.yaml b/airflow-core/src/airflow/api_fastapi/core_api/openapi/_private_ui.yaml index 8b9c7811b143b..920263e8f2085 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/openapi/_private_ui.yaml +++ b/airflow-core/src/airflow/api_fastapi/core_api/openapi/_private_ui.yaml @@ -899,6 +899,11 @@ components: format: date-time - type: 'null' title: End Date + duration: + anyOf: + - type: number + - type: 'null' + title: Duration data_interval_start: anyOf: - type: string @@ -961,6 +966,7 @@ components: - queued_at - start_date - end_date + - duration - data_interval_start - data_interval_end - run_after diff --git a/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml b/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml index 0901c620b2987..582807ad74bb3 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml +++ b/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml @@ -8610,6 +8610,11 @@ components: format: date-time - type: 'null' title: End Date + duration: + anyOf: + - type: number + - type: 'null' + title: Duration data_interval_start: anyOf: - type: string @@ -8672,6 +8677,7 @@ components: - queued_at - start_date - end_date + - duration - data_interval_start - data_interval_end - run_after diff --git a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py index 44370b3724160..915c83d73bbb1 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py +++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py @@ -324,6 +324,7 @@ def get_dag_runs( "end_date", "updated_at", "conf", + "duration", ], DagRun, {"dag_run_id": "run_id"}, diff --git a/airflow-core/src/airflow/models/dagrun.py b/airflow-core/src/airflow/models/dagrun.py index cd5ec56a83993..8dcd00fef0cbc 100644 --- a/airflow-core/src/airflow/models/dagrun.py +++ b/airflow-core/src/airflow/models/dagrun.py @@ -45,6 +45,7 @@ Text, UniqueConstraint, and_, + case, func, not_, or_, @@ -54,9 +55,11 @@ from sqlalchemy.dialects import postgresql from sqlalchemy.exc import IntegrityError from sqlalchemy.ext.associationproxy import association_proxy +from sqlalchemy.ext.hybrid import hybrid_property from sqlalchemy.ext.mutable import MutableDict from sqlalchemy.orm import declared_attr, joinedload, relationship, synonym, validates -from sqlalchemy.sql.expression import case, false, select +from sqlalchemy.sql.elements import Case +from sqlalchemy.sql.expression import false, select from sqlalchemy.sql.functions import coalesce from sqlalchemy_utils import UUIDType @@ -95,6 +98,7 @@ from opentelemetry.sdk.trace import Span from pydantic import NonNegativeInt from sqlalchemy.orm import Query, Session + from sqlalchemy.sql.elements import Case from airflow.models.baseoperator import BaseOperator from airflow.models.dag import DAG @@ -374,6 +378,28 @@ def version_number(self) -> int | None: return dag_versions[-1].version_number return None + @hybrid_property + def duration(self) -> float | None: + if self.end_date and self.start_date: + return (self.end_date - self.start_date).total_seconds() + return None + + @duration.expression # type: ignore[no-redef] + @provide_session + def duration(cls, session: Session = NEW_SESSION) -> Case: + dialect_name = session.bind.dialect.name + if dialect_name == "mysql": + return func.timestampdiff(text("SECOND"), cls.start_date, cls.end_date) + return case( + [ + ( + (cls.end_date != None) & (cls.start_date != None), # noqa: E711 + func.extract("epoch", cls.end_date - cls.start_date), + ) + ], + else_=None, + ) + @provide_session def check_version_id_exists_in_dr(self, dag_version_id: UUIDType, session: Session = NEW_SESSION): select_stmt = ( diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts b/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts index 7ffda75f42628..56bb0fe4f1bff 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts @@ -2402,6 +2402,17 @@ export const $DAGRunResponse = { ], title: "End Date", }, + duration: { + anyOf: [ + { + type: "number", + }, + { + type: "null", + }, + ], + title: "Duration", + }, data_interval_start: { anyOf: [ { @@ -2513,6 +2524,7 @@ export const $DAGRunResponse = { "queued_at", "start_date", "end_date", + "duration", "data_interval_start", "data_interval_end", "run_after", diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts b/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts index 02a9746bf4677..79a67a37f8d32 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts @@ -626,6 +626,7 @@ export type DAGRunResponse = { queued_at: string | null; start_date: string | null; end_date: string | null; + duration: number | null; data_interval_start: string | null; data_interval_end: string | null; run_after: string; diff --git a/airflow-core/src/airflow/ui/src/pages/DagRuns.tsx b/airflow-core/src/airflow/ui/src/pages/DagRuns.tsx index e304537e4fac6..ffea368655eb2 100644 --- a/airflow-core/src/airflow/ui/src/pages/DagRuns.tsx +++ b/airflow-core/src/airflow/ui/src/pages/DagRuns.tsx @@ -42,7 +42,7 @@ import { Select } from "src/components/ui"; import { SearchParamsKeys, type SearchParamsKeysType } from "src/constants/searchParams"; import { dagRunTypeOptions, dagRunStateOptions as stateOptions } from "src/constants/stateOptions"; import DeleteRunButton from "src/pages/DeleteRunButton"; -import { getDuration, useAutoRefresh, isStatePending } from "src/utils"; +import { renderDuration, useAutoRefresh, isStatePending } from "src/utils"; type DagRunRow = { row: { original: DAGRunResponse } }; const { @@ -104,7 +104,8 @@ const runColumns = (translate: TFunction, dagId?: string): Array getDuration(original.start_date, original.end_date), + accessorKey: "duration", + cell: ({ row: { original } }) => renderDuration(original.duration), header: translate("dags:runs.columns.duration"), }, { @@ -129,7 +130,6 @@ const runColumns = (translate: TFunction, dagId?: string): Array { - const seconds = dayjs.duration(dayjs(endDate ?? undefined).diff(startDate ?? undefined)).asSeconds(); - - if (isNaN(seconds) || seconds <= 0) { +export const renderDuration = (durationSeconds: number | null | undefined): string => { + if ( + durationSeconds === null || + durationSeconds === undefined || + isNaN(durationSeconds) || + durationSeconds <= 0 + ) { return "00:00:00"; } - if (seconds < 10) { - return `${seconds.toFixed(2)}s`; + if (durationSeconds < 10) { + return `${durationSeconds.toFixed(2)}s`; } - return seconds < 86_400 - ? dayjs.duration(seconds, "seconds").format("HH:mm:ss") - : dayjs.duration(seconds, "seconds").format("D[d]HH:mm:ss"); + return durationSeconds < 86_400 + ? dayjs.duration(durationSeconds, "seconds").format("HH:mm:ss") + : dayjs.duration(durationSeconds, "seconds").format("D[d]HH:mm:ss"); +}; + +export const getDuration = (startDate?: string | null, endDate?: string | null) => { + const seconds = dayjs.duration(dayjs(endDate ?? undefined).diff(startDate ?? undefined)).asSeconds(); + + return renderDuration(seconds); }; diff --git a/airflow-core/src/airflow/ui/src/utils/index.ts b/airflow-core/src/airflow/ui/src/utils/index.ts index 93f604e57e121..49924fabcad4f 100644 --- a/airflow-core/src/airflow/ui/src/utils/index.ts +++ b/airflow-core/src/airflow/ui/src/utils/index.ts @@ -19,7 +19,7 @@ export { capitalize } from "./capitalize"; export { pluralize } from "./pluralize"; -export { getDuration } from "./datetimeUtils"; +export { getDuration, renderDuration } from "./datetimeUtils"; export { getMetaKey } from "./getMetaKey"; export { useContainerWidth } from "./useContainerWidth"; export * from "./query"; diff --git a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_assets.py b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_assets.py index 2ef6185313081..800a7efccfbec 100644 --- a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_assets.py +++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_assets.py @@ -1196,6 +1196,7 @@ def test_should_respond_200(self, test_client): "run_after": mock.ANY, "start_date": None, "end_date": None, + "duration": None, "data_interval_start": None, "data_interval_end": None, "last_scheduling_decision": None, diff --git a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py index 3c4d16072bcd2..baa7e503c83ed 100644 --- a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py +++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py @@ -177,6 +177,7 @@ def get_dag_run_dict(run: DagRun): "run_after": from_datetime_to_zulu_without_ms(run.run_after), "start_date": from_datetime_to_zulu_without_ms(run.start_date), "end_date": from_datetime_to_zulu(run.end_date), + "duration": run.duration, "data_interval_start": from_datetime_to_zulu_without_ms(run.data_interval_start), "data_interval_end": from_datetime_to_zulu_without_ms(run.data_interval_end), "last_scheduling_decision": ( @@ -308,6 +309,7 @@ def test_should_respond_403(self, unauthorized_test_client): pytest.param("end_date", [DAG1_RUN1_ID, DAG1_RUN2_ID], id="order_by_end_date"), pytest.param("updated_at", [DAG1_RUN1_ID, DAG1_RUN2_ID], id="order_by_updated_at"), pytest.param("conf", [DAG1_RUN1_ID, DAG1_RUN2_ID], id="order_by_conf"), + pytest.param("duration", [DAG1_RUN1_ID, DAG1_RUN2_ID], id="order_by_duration"), ], ) @pytest.mark.usefixtures("configure_git_connection_for_dag_bundle") @@ -1316,6 +1318,7 @@ def test_should_respond_200( "logical_date": expected_logical_date, "run_after": fixed_now.replace("+00:00", "Z"), "start_date": None, + "duration": None, "state": "queued", "data_interval_end": expected_data_interval_end, "data_interval_start": expected_data_interval_start, @@ -1506,6 +1509,7 @@ def test_should_response_409_for_duplicate_logical_date(self, test_client): "queued_at": now, "start_date": None, "end_date": None, + "duration": None, "run_after": now, "data_interval_start": now, "data_interval_end": now, @@ -1593,6 +1597,7 @@ def test_should_respond_200_with_null_logical_date(self, test_client): "run_after": mock.ANY, "start_date": None, "end_date": None, + "duration": None, "data_interval_start": mock.ANY, "data_interval_end": mock.ANY, "last_scheduling_decision": None, diff --git a/airflow-core/tests/unit/cli/commands/test_asset_command.py b/airflow-core/tests/unit/cli/commands/test_asset_command.py index c03b3bd945279..e5f85856945f8 100644 --- a/airflow-core/tests/unit/cli/commands/test_asset_command.py +++ b/airflow-core/tests/unit/cli/commands/test_asset_command.py @@ -146,6 +146,7 @@ def test_cli_assets_materialize(parser: ArgumentParser) -> None: "dag_display_name": "asset1_producer", "dag_id": "asset1_producer", "end_date": None, + "duration": None, "last_scheduling_decision": None, "note": None, "run_type": "manual", diff --git a/airflow-ctl/src/airflowctl/api/datamodels/generated.py b/airflow-ctl/src/airflowctl/api/datamodels/generated.py index 63056402f6a50..330b2a91f72c7 100644 --- a/airflow-ctl/src/airflowctl/api/datamodels/generated.py +++ b/airflow-ctl/src/airflowctl/api/datamodels/generated.py @@ -1314,6 +1314,7 @@ class DAGRunResponse(BaseModel): queued_at: Annotated[datetime | None, Field(title="Queued At")] = None start_date: Annotated[datetime | None, Field(title="Start Date")] = None end_date: Annotated[datetime | None, Field(title="End Date")] = None + duration: Annotated[float | None, Field(title="Duration")] = None data_interval_start: Annotated[datetime | None, Field(title="Data Interval Start")] = None data_interval_end: Annotated[datetime | None, Field(title="Data Interval End")] = None run_after: Annotated[datetime, Field(title="Run After")] diff --git a/providers/openlineage/tests/unit/openlineage/utils/test_utils.py b/providers/openlineage/tests/unit/openlineage/utils/test_utils.py index 2ee3560d2360b..7cce6dbe8d82b 100644 --- a/providers/openlineage/tests/unit/openlineage/utils/test_utils.py +++ b/providers/openlineage/tests/unit/openlineage/utils/test_utils.py @@ -78,6 +78,7 @@ class CustomOperatorFromEmpty(EmptyOperator): pass +@pytest.mark.db_test def test_get_airflow_job_facet(): with DAG(dag_id="dag", schedule=None, start_date=datetime.datetime(2024, 6, 1)) as dag: task_0 = BashOperator(task_id="task_0", bash_command="exit 0;") @@ -130,6 +131,7 @@ def test_get_airflow_job_facet(): } +@pytest.mark.db_test def test_get_airflow_dag_run_facet(): with DAG( dag_id="dag", @@ -238,6 +240,7 @@ def test_dag_run_version_no_versions(): @pytest.mark.parametrize("key", ["bundle_name", "bundle_version", "version_id", "version_number"]) +@pytest.mark.db_test def test_dag_run_version(key): dagrun_mock = MagicMock(DagRun) dagrun_mock.dag_versions = [