Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ class DAGRunResponse(BaseModel):
queued_at: datetime | None
start_date: datetime | None
end_date: datetime | None
duration: float | None
data_interval_start: datetime | None
data_interval_end: datetime | None
run_after: datetime
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -899,6 +899,11 @@ components:
format: date-time
- type: 'null'
title: End Date
duration:
anyOf:
- type: number
- type: 'null'
title: Duration
data_interval_start:
anyOf:
- type: string
Expand Down Expand Up @@ -961,6 +966,7 @@ components:
- queued_at
- start_date
- end_date
- duration
- data_interval_start
- data_interval_end
- run_after
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8610,6 +8610,11 @@ components:
format: date-time
- type: 'null'
title: End Date
duration:
anyOf:
- type: number
- type: 'null'
title: Duration
data_interval_start:
anyOf:
- type: string
Expand Down Expand Up @@ -8672,6 +8677,7 @@ components:
- queued_at
- start_date
- end_date
- duration
- data_interval_start
- data_interval_end
- run_after
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,7 @@ def get_dag_runs(
"end_date",
"updated_at",
"conf",
"duration",
],
DagRun,
{"dag_run_id": "run_id"},
Expand Down
28 changes: 27 additions & 1 deletion airflow-core/src/airflow/models/dagrun.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
Text,
UniqueConstraint,
and_,
case,
func,
not_,
or_,
Expand All @@ -54,9 +55,11 @@
from sqlalchemy.dialects import postgresql
from sqlalchemy.exc import IntegrityError
from sqlalchemy.ext.associationproxy import association_proxy
from sqlalchemy.ext.hybrid import hybrid_property
from sqlalchemy.ext.mutable import MutableDict
from sqlalchemy.orm import declared_attr, joinedload, relationship, synonym, validates
from sqlalchemy.sql.expression import case, false, select
from sqlalchemy.sql.elements import Case
from sqlalchemy.sql.expression import false, select
from sqlalchemy.sql.functions import coalesce
from sqlalchemy_utils import UUIDType

Expand Down Expand Up @@ -95,6 +98,7 @@
from opentelemetry.sdk.trace import Span
from pydantic import NonNegativeInt
from sqlalchemy.orm import Query, Session
from sqlalchemy.sql.elements import Case

from airflow.models.baseoperator import BaseOperator
from airflow.models.dag import DAG
Expand Down Expand Up @@ -374,6 +378,28 @@ def version_number(self) -> int | None:
return dag_versions[-1].version_number
return None

@hybrid_property
def duration(self) -> float | None:
if self.end_date and self.start_date:
return (self.end_date - self.start_date).total_seconds()
return None

@duration.expression # type: ignore[no-redef]
@provide_session
def duration(cls, session: Session = NEW_SESSION) -> Case:
dialect_name = session.bind.dialect.name
if dialect_name == "mysql":
return func.timestampdiff(text("SECOND"), cls.start_date, cls.end_date)
return case(
[
(
(cls.end_date != None) & (cls.start_date != None), # noqa: E711
func.extract("epoch", cls.end_date - cls.start_date),
)
],
else_=None,
)

@provide_session
def check_version_id_exists_in_dr(self, dag_version_id: UUIDType, session: Session = NEW_SESSION):
select_stmt = (
Expand Down
12 changes: 12 additions & 0 deletions airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2402,6 +2402,17 @@ export const $DAGRunResponse = {
],
title: "End Date",
},
duration: {
anyOf: [
{
type: "number",
},
{
type: "null",
},
],
title: "Duration",
},
data_interval_start: {
anyOf: [
{
Expand Down Expand Up @@ -2513,6 +2524,7 @@ export const $DAGRunResponse = {
"queued_at",
"start_date",
"end_date",
"duration",
"data_interval_start",
"data_interval_end",
"run_after",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -626,6 +626,7 @@ export type DAGRunResponse = {
queued_at: string | null;
start_date: string | null;
end_date: string | null;
duration: number | null;
data_interval_start: string | null;
data_interval_end: string | null;
run_after: string;
Expand Down
6 changes: 3 additions & 3 deletions airflow-core/src/airflow/ui/src/pages/DagRuns.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ import { Select } from "src/components/ui";
import { SearchParamsKeys, type SearchParamsKeysType } from "src/constants/searchParams";
import { dagRunTypeOptions, dagRunStateOptions as stateOptions } from "src/constants/stateOptions";
import DeleteRunButton from "src/pages/DeleteRunButton";
import { getDuration, useAutoRefresh, isStatePending } from "src/utils";
import { renderDuration, useAutoRefresh, isStatePending } from "src/utils";

type DagRunRow = { row: { original: DAGRunResponse } };
const {
Expand Down Expand Up @@ -104,7 +104,8 @@ const runColumns = (translate: TFunction, dagId?: string): Array<ColumnDef<DAGRu
header: translate("dags:runs.columns.endDate"),
},
{
cell: ({ row: { original } }) => getDuration(original.start_date, original.end_date),
accessorKey: "duration",
cell: ({ row: { original } }) => renderDuration(original.duration),
header: translate("dags:runs.columns.duration"),
},
{
Expand All @@ -129,7 +130,6 @@ const runColumns = (translate: TFunction, dagId?: string): Array<ColumnDef<DAGRu

return undefined;
},
enableSorting: false,
header: translate("dags:runs.columns.conf"),
},
{
Expand Down
27 changes: 18 additions & 9 deletions airflow-core/src/airflow/ui/src/utils/datetimeUtils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,27 @@ import dayjsDuration from "dayjs/plugin/duration";

dayjs.extend(dayjsDuration);

export const getDuration = (startDate?: string | null, endDate?: string | null) => {
const seconds = dayjs.duration(dayjs(endDate ?? undefined).diff(startDate ?? undefined)).asSeconds();

if (isNaN(seconds) || seconds <= 0) {
export const renderDuration = (durationSeconds: number | null | undefined): string => {
if (
durationSeconds === null ||
durationSeconds === undefined ||
isNaN(durationSeconds) ||
durationSeconds <= 0
) {
return "00:00:00";
}

if (seconds < 10) {
return `${seconds.toFixed(2)}s`;
if (durationSeconds < 10) {
return `${durationSeconds.toFixed(2)}s`;
}

return seconds < 86_400
? dayjs.duration(seconds, "seconds").format("HH:mm:ss")
: dayjs.duration(seconds, "seconds").format("D[d]HH:mm:ss");
return durationSeconds < 86_400
? dayjs.duration(durationSeconds, "seconds").format("HH:mm:ss")
: dayjs.duration(durationSeconds, "seconds").format("D[d]HH:mm:ss");
};

export const getDuration = (startDate?: string | null, endDate?: string | null) => {
const seconds = dayjs.duration(dayjs(endDate ?? undefined).diff(startDate ?? undefined)).asSeconds();

return renderDuration(seconds);
};
2 changes: 1 addition & 1 deletion airflow-core/src/airflow/ui/src/utils/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

export { capitalize } from "./capitalize";
export { pluralize } from "./pluralize";
export { getDuration } from "./datetimeUtils";
export { getDuration, renderDuration } from "./datetimeUtils";
export { getMetaKey } from "./getMetaKey";
export { useContainerWidth } from "./useContainerWidth";
export * from "./query";
Original file line number Diff line number Diff line change
Expand Up @@ -1196,6 +1196,7 @@ def test_should_respond_200(self, test_client):
"run_after": mock.ANY,
"start_date": None,
"end_date": None,
"duration": None,
"data_interval_start": None,
"data_interval_end": None,
"last_scheduling_decision": None,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ def get_dag_run_dict(run: DagRun):
"run_after": from_datetime_to_zulu_without_ms(run.run_after),
"start_date": from_datetime_to_zulu_without_ms(run.start_date),
"end_date": from_datetime_to_zulu(run.end_date),
"duration": run.duration,
"data_interval_start": from_datetime_to_zulu_without_ms(run.data_interval_start),
"data_interval_end": from_datetime_to_zulu_without_ms(run.data_interval_end),
"last_scheduling_decision": (
Expand Down Expand Up @@ -308,6 +309,7 @@ def test_should_respond_403(self, unauthorized_test_client):
pytest.param("end_date", [DAG1_RUN1_ID, DAG1_RUN2_ID], id="order_by_end_date"),
pytest.param("updated_at", [DAG1_RUN1_ID, DAG1_RUN2_ID], id="order_by_updated_at"),
pytest.param("conf", [DAG1_RUN1_ID, DAG1_RUN2_ID], id="order_by_conf"),
pytest.param("duration", [DAG1_RUN1_ID, DAG1_RUN2_ID], id="order_by_duration"),
],
)
@pytest.mark.usefixtures("configure_git_connection_for_dag_bundle")
Expand Down Expand Up @@ -1316,6 +1318,7 @@ def test_should_respond_200(
"logical_date": expected_logical_date,
"run_after": fixed_now.replace("+00:00", "Z"),
"start_date": None,
"duration": None,
"state": "queued",
"data_interval_end": expected_data_interval_end,
"data_interval_start": expected_data_interval_start,
Expand Down Expand Up @@ -1506,6 +1509,7 @@ def test_should_response_409_for_duplicate_logical_date(self, test_client):
"queued_at": now,
"start_date": None,
"end_date": None,
"duration": None,
"run_after": now,
"data_interval_start": now,
"data_interval_end": now,
Expand Down Expand Up @@ -1593,6 +1597,7 @@ def test_should_respond_200_with_null_logical_date(self, test_client):
"run_after": mock.ANY,
"start_date": None,
"end_date": None,
"duration": None,
"data_interval_start": mock.ANY,
"data_interval_end": mock.ANY,
"last_scheduling_decision": None,
Expand Down
1 change: 1 addition & 0 deletions airflow-core/tests/unit/cli/commands/test_asset_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ def test_cli_assets_materialize(parser: ArgumentParser) -> None:
"dag_display_name": "asset1_producer",
"dag_id": "asset1_producer",
"end_date": None,
"duration": None,
"last_scheduling_decision": None,
"note": None,
"run_type": "manual",
Expand Down
1 change: 1 addition & 0 deletions airflow-ctl/src/airflowctl/api/datamodels/generated.py
Original file line number Diff line number Diff line change
Expand Up @@ -1314,6 +1314,7 @@ class DAGRunResponse(BaseModel):
queued_at: Annotated[datetime | None, Field(title="Queued At")] = None
start_date: Annotated[datetime | None, Field(title="Start Date")] = None
end_date: Annotated[datetime | None, Field(title="End Date")] = None
duration: Annotated[float | None, Field(title="Duration")] = None
data_interval_start: Annotated[datetime | None, Field(title="Data Interval Start")] = None
data_interval_end: Annotated[datetime | None, Field(title="Data Interval End")] = None
run_after: Annotated[datetime, Field(title="Run After")]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ class CustomOperatorFromEmpty(EmptyOperator):
pass


@pytest.mark.db_test
def test_get_airflow_job_facet():
with DAG(dag_id="dag", schedule=None, start_date=datetime.datetime(2024, 6, 1)) as dag:
task_0 = BashOperator(task_id="task_0", bash_command="exit 0;")
Expand Down Expand Up @@ -130,6 +131,7 @@ def test_get_airflow_job_facet():
}


@pytest.mark.db_test
def test_get_airflow_dag_run_facet():
with DAG(
dag_id="dag",
Expand Down Expand Up @@ -238,6 +240,7 @@ def test_dag_run_version_no_versions():


@pytest.mark.parametrize("key", ["bundle_name", "bundle_version", "version_id", "version_number"])
@pytest.mark.db_test
def test_dag_run_version(key):
dagrun_mock = MagicMock(DagRun)
dagrun_mock.dag_versions = [
Expand Down