diff --git a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/task_instances.py b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/task_instances.py
index b67ac7ea9c7af..0a2570a0734d1 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/task_instances.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/task_instances.py
@@ -166,6 +166,7 @@ class ClearTaskInstancesBody(StrictBaseModel):
description="(Experimental) Run on the latest bundle version of the dag after "
"clearing the task instances.",
)
+ prevent_running_task: bool = False
@model_validator(mode="before")
@classmethod
diff --git a/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml b/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml
index 23fb5c7d8c23d..e980e3772d886 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml
+++ b/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml
@@ -7083,6 +7083,12 @@ paths:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Not Found
+ '409':
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/HTTPExceptionResponse'
+ description: Conflict
'422':
description: Validation Error
content:
@@ -9605,6 +9611,10 @@ components:
description: (Experimental) Run on the latest bundle version of the dag
after clearing the task instances.
default: false
+ prevent_running_task:
+ type: boolean
+ title: Prevent Running Task
+ default: false
additionalProperties: false
type: object
title: ClearTaskInstancesBody
diff --git a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/task_instances.py b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/task_instances.py
index 21d88da4874e6..e0e730399ad9b 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/task_instances.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/task_instances.py
@@ -86,7 +86,7 @@
_patch_ti_validate_request,
)
from airflow.api_fastapi.logging.decorators import action_logging
-from airflow.exceptions import TaskNotFound
+from airflow.exceptions import AirflowClearRunningTaskException, TaskNotFound
from airflow.models import Base, DagRun
from airflow.models.taskinstance import TaskInstance as TI, clear_task_instances
from airflow.models.taskinstancehistory import TaskInstanceHistory as TIH
@@ -711,7 +711,7 @@ def get_mapped_task_instance_try_details(
@task_instances_router.post(
"/clearTaskInstances",
- responses=create_openapi_http_exception_doc([status.HTTP_404_NOT_FOUND]),
+ responses=create_openapi_http_exception_doc([status.HTTP_404_NOT_FOUND, status.HTTP_409_CONFLICT]),
dependencies=[
Depends(action_logging()),
Depends(requires_access_dag(method="PUT", access_entity=DagAccessEntity.TASK_INSTANCE)),
@@ -805,12 +805,16 @@ def post_clear_task_instances(
)
if not dry_run:
- clear_task_instances(
- task_instances,
- session,
- DagRunState.QUEUED if reset_dag_runs else False,
- run_on_latest_version=body.run_on_latest_version,
- )
+ try:
+ clear_task_instances(
+ task_instances,
+ session,
+ DagRunState.QUEUED if reset_dag_runs else False,
+ run_on_latest_version=body.run_on_latest_version,
+ prevent_running_task=body.prevent_running_task,
+ )
+ except AirflowClearRunningTaskException as e:
+ raise HTTPException(status.HTTP_409_CONFLICT, str(e)) from e
return TaskInstanceCollectionResponse(
task_instances=[TaskInstanceResponse.model_validate(ti) for ti in task_instances],
diff --git a/airflow-core/src/airflow/exceptions.py b/airflow-core/src/airflow/exceptions.py
index 6d65ad8723756..5e6ceecab10d0 100644
--- a/airflow-core/src/airflow/exceptions.py
+++ b/airflow-core/src/airflow/exceptions.py
@@ -535,3 +535,7 @@ def __getattr__(name: str):
return AirflowDagCycleException
raise AttributeError(f"module '{__name__}' has no attribute '{name}'")
+
+
+class AirflowClearRunningTaskException(AirflowException):
+ """Raise when the user attempts to clear currently running tasks."""
diff --git a/airflow-core/src/airflow/models/taskinstance.py b/airflow-core/src/airflow/models/taskinstance.py
index 81c44eb2b0b68..1af338edfaf12 100644
--- a/airflow-core/src/airflow/models/taskinstance.py
+++ b/airflow-core/src/airflow/models/taskinstance.py
@@ -194,6 +194,7 @@ def clear_task_instances(
session: Session,
dag_run_state: DagRunState | Literal[False] = DagRunState.QUEUED,
run_on_latest_version: bool = False,
+ prevent_running_task: bool | None = None,
) -> None:
"""
Clear a set of task instances, but make sure the running ones get killed.
@@ -213,16 +214,25 @@ def clear_task_instances(
:meta private:
"""
task_instance_ids: list[str] = []
+ from airflow.exceptions import AirflowClearRunningTaskException
from airflow.models.dagbag import DBDagBag
scheduler_dagbag = DBDagBag(load_op_links=False)
for ti in tis:
task_instance_ids.append(ti.id)
ti.prepare_db_for_next_try(session)
+
if ti.state == TaskInstanceState.RUNNING:
- # If a task is cleared when running, set its state to RESTARTING so that
- # the task is terminated and becomes eligible for retry.
+ if prevent_running_task:
+ raise AirflowClearRunningTaskException(
+ "AirflowClearRunningTaskException: Disable 'prevent_running_task' to proceed, or wait until the task is not running, queued, or scheduled state."
+ )
+ # Prevents the task from re-running and clearing when prevent_running_task from the frontend and the tas is running is True.
+
ti.state = TaskInstanceState.RESTARTING
+ # If a task is cleared when running and the prevent_running_task is false,
+ # set its state to RESTARTING so that
+ # the task is terminated and becomes eligible for retry.
else:
dr = ti.dag_run
if run_on_latest_version:
diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts b/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts
index e2de6b6c9e4b4..b1a4803ce4260 100644
--- a/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts
+++ b/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts
@@ -1333,6 +1333,11 @@ export const $ClearTaskInstancesBody = {
title: 'Run On Latest Version',
description: '(Experimental) Run on the latest bundle version of the dag after clearing the task instances.',
default: false
+ },
+ prevent_running_task: {
+ type: 'boolean',
+ title: 'Prevent Running Task',
+ default: false
}
},
additionalProperties: false,
diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/services.gen.ts b/airflow-core/src/airflow/ui/openapi-gen/requests/services.gen.ts
index c2b866d87ce8a..808f54ab9ad19 100644
--- a/airflow-core/src/airflow/ui/openapi-gen/requests/services.gen.ts
+++ b/airflow-core/src/airflow/ui/openapi-gen/requests/services.gen.ts
@@ -2557,6 +2557,7 @@ export class TaskInstanceService {
401: 'Unauthorized',
403: 'Forbidden',
404: 'Not Found',
+ 409: 'Conflict',
422: 'Validation Error'
}
});
diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts b/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts
index 2b814b1ebea1e..ea955c6c279a5 100644
--- a/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts
+++ b/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts
@@ -422,6 +422,7 @@ export type ClearTaskInstancesBody = {
* (Experimental) Run on the latest bundle version of the dag after clearing the task instances.
*/
run_on_latest_version?: boolean;
+ prevent_running_task?: boolean;
};
/**
@@ -5436,6 +5437,10 @@ export type $OpenApiTs = {
* Not Found
*/
404: HTTPExceptionResponse;
+ /**
+ * Conflict
+ */
+ 409: HTTPExceptionResponse;
/**
* Validation Error
*/
diff --git a/airflow-core/src/airflow/ui/public/i18n/locales/en/dags.json b/airflow-core/src/airflow/ui/public/i18n/locales/en/dags.json
index 6705e10196206..aef5d3f65eea0 100644
--- a/airflow-core/src/airflow/ui/public/i18n/locales/en/dags.json
+++ b/airflow-core/src/airflow/ui/public/i18n/locales/en/dags.json
@@ -63,7 +63,12 @@
"past": "Past",
"queueNew": "Queue up new tasks",
"runOnLatestVersion": "Run with latest bundle version",
- "upstream": "Upstream"
+ "upstream": "Upstream",
+ "preventRunningTasks": "Prevent rerun if task is running"
+ },
+ "confirmationDialog": {
+ "title": "Cannot Clear Task Instance",
+ "description": "Task is currently in a {{state}} state started by user {{user}} at {{time}}. \nThe user is unable to clear this task until it is done running or a user unchecks the \"Prevent rerun of running tasks\" option in the clear task dialog."
}
},
"search": {
diff --git a/airflow-core/src/airflow/ui/src/components/Clear/TaskInstance/ClearTaskInstanceConfirmationDialog.tsx b/airflow-core/src/airflow/ui/src/components/Clear/TaskInstance/ClearTaskInstanceConfirmationDialog.tsx
new file mode 100644
index 0000000000000..a76a2ae778fe6
--- /dev/null
+++ b/airflow-core/src/airflow/ui/src/components/Clear/TaskInstance/ClearTaskInstanceConfirmationDialog.tsx
@@ -0,0 +1,151 @@
+/*!
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+import { useEffect, useState, useCallback } from "react";
+import { VStack, Icon, Text, Spinner } from "@chakra-ui/react";
+import { GoAlertFill } from "react-icons/go";
+import { useTranslation } from "react-i18next";
+import { Button, Dialog } from "src/components/ui";
+import { useClearTaskInstancesDryRun } from "src/queries/useClearTaskInstancesDryRun";
+import { getRelativeTime } from "src/utils/datetimeUtils";
+
+type Props = {
+ readonly dagDetails?: {
+ dagId: string;
+ dagRunId: string;
+ downstream?: boolean;
+ future?: boolean;
+ mapIndex?: number;
+ onlyFailed?: boolean;
+ past?: boolean;
+ taskId: string;
+ upstream?: boolean;
+ };
+ readonly onClose: () => void;
+ readonly onConfirm?: () => void;
+ readonly open: boolean;
+ readonly preventRunningTask: boolean;
+};
+
+const ClearTaskInstanceConfirmationDialog = ({
+ dagDetails,
+ onClose,
+ onConfirm,
+ open,
+ preventRunningTask,
+}: Props) => {
+ const { t: translate } = useTranslation();
+ const { data, isFetching } = useClearTaskInstancesDryRun({
+ dagId: dagDetails?.dagId ?? "",
+ options: {
+ enabled: open && Boolean(dagDetails),
+ gcTime: 0,
+ refetchOnMount: "always",
+ refetchOnWindowFocus: false,
+ staleTime: 0,
+ },
+ requestBody: {
+ dag_run_id: dagDetails?.dagRunId ?? "",
+ include_downstream: dagDetails?.downstream,
+ include_future: dagDetails?.future,
+ include_past: dagDetails?.past,
+ include_upstream: dagDetails?.upstream,
+ only_failed: dagDetails?.onlyFailed,
+ task_ids: [[dagDetails?.taskId ?? "", dagDetails?.mapIndex ?? 0]],
+ },
+ });
+
+ const [isReady, setIsReady] = useState(false);
+
+ const handleConfirm = useCallback(() => {
+ if (onConfirm) onConfirm();
+ onClose();
+ }, [onConfirm, onClose]);
+
+ const taskInstances = data?.task_instances ?? [];
+ const [firstInstance] = taskInstances;
+ const taskCurrentState = firstInstance?.state;
+
+ useEffect(() => {
+ if (!isFetching && open && data) {
+ const isInTriggeringState =
+ taskCurrentState === "queued" || taskCurrentState === "scheduled";
+
+ if (!preventRunningTask || !isInTriggeringState) {
+ handleConfirm();
+ } else {
+ setIsReady(true);
+ }
+ }
+ }, [isFetching, data, open, handleConfirm, taskCurrentState, preventRunningTask]);
+
+ return (
+
+
+ {isFetching ? (
+
+
+
+ {translate("common:task.documentation")}
+
+
+ ) : isReady ? (
+ <>
+
+
+
+
+
+
+ {translate("dags:runAndTaskActions.confirmationDialog.title")}
+
+
+ {taskInstances.length > 0 && (
+ <>
+ {translate(
+ "dags:runAndTaskActions.confirmationDialog.description",
+ {
+ state: taskCurrentState,
+ time:
+ firstInstance?.start_date !== null && firstInstance?.start_date !== undefined
+ ? getRelativeTime(firstInstance.start_date)
+ : undefined,
+ user:
+ (firstInstance?.unixname?.trim().length ?? 0) > 0
+ ? firstInstance?.unixname
+ : "unknown user",
+ }
+ )}
+ >
+ )}
+
+
+
+
+
+
+ >
+ ) : null}
+
+
+ );
+};
+
+export default ClearTaskInstanceConfirmationDialog;
diff --git a/airflow-core/src/airflow/ui/src/components/Clear/TaskInstance/ClearTaskInstanceDialog.tsx b/airflow-core/src/airflow/ui/src/components/Clear/TaskInstance/ClearTaskInstanceDialog.tsx
index 31ac40278df36..fa4e85a3bf0d2 100644
--- a/airflow-core/src/airflow/ui/src/components/Clear/TaskInstance/ClearTaskInstanceDialog.tsx
+++ b/airflow-core/src/airflow/ui/src/components/Clear/TaskInstance/ClearTaskInstanceDialog.tsx
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-import { Flex, Heading, VStack } from "@chakra-ui/react";
+import { Flex, Heading, VStack, useDisclosure } from "@chakra-ui/react";
import { useState } from "react";
import { useTranslation } from "react-i18next";
import { CgRedo } from "react-icons/cg";
@@ -30,6 +30,7 @@ import SegmentedControl from "src/components/ui/SegmentedControl";
import { useClearTaskInstances } from "src/queries/useClearTaskInstances";
import { useClearTaskInstancesDryRun } from "src/queries/useClearTaskInstancesDryRun";
import { usePatchTaskInstance } from "src/queries/usePatchTaskInstance";
+import ClearTaskInstanceConfirmationDialog from "./ClearTaskInstanceConfirmationDialog";
type Props = {
readonly onClose: () => void;
@@ -37,10 +38,12 @@ type Props = {
readonly taskInstance: TaskInstanceResponse;
};
-const ClearTaskInstanceDialog = ({ onClose, open, taskInstance }: Props) => {
+const ClearTaskInstanceDialog = ({ onClose: onCloseDialog, open: openDialog, taskInstance }: Props) => {
const taskId = taskInstance.task_id;
const mapIndex = taskInstance.map_index;
const { t: translate } = useTranslation();
+ const { onClose, onOpen, open } = useDisclosure();
+
const dagId = taskInstance.dag_id;
const dagRunId = taskInstance.dag_run_id;
@@ -48,7 +51,7 @@ const ClearTaskInstanceDialog = ({ onClose, open, taskInstance }: Props) => {
const { isPending, mutate } = useClearTaskInstances({
dagId,
dagRunId,
- onSuccessConfirm: onClose,
+ onSuccessConfirm: onCloseDialog,
});
const [selectedOptions, setSelectedOptions] = useState>([]);
@@ -59,6 +62,7 @@ const ClearTaskInstanceDialog = ({ onClose, open, taskInstance }: Props) => {
const upstream = selectedOptions.includes("upstream");
const downstream = selectedOptions.includes("downstream");
const [runOnLatestVersion, setRunOnLatestVersion] = useState(false);
+ const [preventRunningTask, setPreventRunningTask] = useState(true);
const [note, setNote] = useState(taskInstance.note);
const { isPending: isPendingPatchDagRun, mutate: mutatePatchTaskInstance } = usePatchTaskInstance({
@@ -76,7 +80,7 @@ const ClearTaskInstanceDialog = ({ onClose, open, taskInstance }: Props) => {
const { data } = useClearTaskInstancesDryRun({
dagId,
options: {
- enabled: open,
+ enabled: openDialog,
refetchOnMount: "always",
},
requestBody: {
@@ -106,7 +110,8 @@ const ClearTaskInstanceDialog = ({ onClose, open, taskInstance }: Props) => {
taskInstanceDagVersionBundleVersion !== "";
return (
-
+ <>
+
@@ -170,35 +175,18 @@ const ClearTaskInstanceDialog = ({ onClose, open, taskInstance }: Props) => {
{translate("dags:runAndTaskActions.options.runOnLatestVersion")}
) : undefined}
+ setPreventRunningTask(Boolean(event.checked))}
+ style={{ marginRight: "auto"}}
+ >
+ {translate("dags:runAndTaskActions.options.preventRunningTasks")}
+
@@ -206,6 +194,50 @@ const ClearTaskInstanceDialog = ({ onClose, open, taskInstance }: Props) => {
+ {open ? {
+ mutate({
+ dagId,
+ requestBody: {
+ dag_run_id: dagRunId,
+ dry_run: false,
+ include_downstream: downstream,
+ include_future: future,
+ include_past: past,
+ include_upstream: upstream,
+ only_failed: onlyFailed,
+ run_on_latest_version: runOnLatestVersion,
+ task_ids: [[taskId, mapIndex]],
+ ...(preventRunningTask ? { prevent_running_task: true } : {}),
+ },
+ });
+ if (note !== taskInstance.note) {
+ mutatePatchTaskInstance({
+ dagId,
+ dagRunId,
+ mapIndex,
+ requestBody: { note },
+ taskId,
+ });
+ }
+ onCloseDialog();
+ }}
+ open={open}
+ preventRunningTask={preventRunningTask}
+ /> : null}
+ >
);
};
diff --git a/airflow-core/src/airflow/ui/src/queries/useClearTaskInstances.ts b/airflow-core/src/airflow/ui/src/queries/useClearTaskInstances.ts
index b20dae1075f07..a04db2fb31dd8 100644
--- a/airflow-core/src/airflow/ui/src/queries/useClearTaskInstances.ts
+++ b/airflow-core/src/airflow/ui/src/queries/useClearTaskInstances.ts
@@ -19,6 +19,7 @@
import { useQueryClient } from "@tanstack/react-query";
import { useTranslation } from "react-i18next";
+
import {
UseDagRunServiceGetDagRunKeyFn,
useDagRunServiceGetDagRunsKey,
@@ -33,6 +34,7 @@ import { toaster } from "src/components/ui";
import { useClearTaskInstancesDryRunKey } from "./useClearTaskInstancesDryRun";
import { usePatchTaskInstanceDryRunKey } from "./usePatchTaskInstanceDryRun";
+import type { ApiError } from "openapi/requests";
export const useClearTaskInstances = ({
dagId,
@@ -46,14 +48,40 @@ export const useClearTaskInstances = ({
const queryClient = useQueryClient();
const { t: translate } = useTranslation("dags");
- const onError = (error: Error) => {
+ const onError = (error: unknown) => {
+ let detail: string;
+ let description: string;
+
+ // Narrow the type safely
+ if (typeof error === "object" && error !== null) {
+ const apiError = error as ApiError;
+
+ description = typeof apiError.message === "string" ? apiError.message : "";
+ const apiErrorWithDetail = apiError as unknown as { detail?: unknown };
+ detail =
+ typeof apiErrorWithDetail.body.detail === "string"
+ ? apiErrorWithDetail.body.detail
+ : "";
+
+ if ( detail.includes("AirflowClearRunningTaskException") === true ) {
+ description = detail
+ }
+
+ } else {
+ // Fallback for completely unknown errors
+ description = translate("common:error.defaultMessage")
+ }
+
toaster.create({
- description: error.message,
- title: translate("dags:runAndTaskActions.clear.error", { type: translate("taskInstance_one") }),
- type: "error",
- });
+ description: description,
+ title: translate("dags:runAndTaskActions.clear.error", {
+ type: translate("common:taskInstance_one"),
+ }),
+ type: "error",
+ });
};
+
const onSuccess = async (
_: TaskInstanceCollectionResponse,
variables: { dagId: string; requestBody: ClearTaskInstancesBody },
@@ -103,5 +131,8 @@ export const useClearTaskInstances = ({
return useTaskInstanceServicePostClearTaskInstances({
onError,
onSuccess,
+ // This function uses the mutation function of React
+ // For showing the error toast immediately, set retry to 0
+ retry: 0
});
};
diff --git a/airflow-core/src/airflow/ui/src/utils/datetimeUtils.test.ts b/airflow-core/src/airflow/ui/src/utils/datetimeUtils.test.ts
index 2ec8c47fd09fd..c8faa00b0c9af 100644
--- a/airflow-core/src/airflow/ui/src/utils/datetimeUtils.test.ts
+++ b/airflow-core/src/airflow/ui/src/utils/datetimeUtils.test.ts
@@ -16,9 +16,9 @@
* specific language governing permissions and limitations
* under the License.
*/
-import { describe, it, expect } from "vitest";
+import { describe, it, expect, vi, beforeAll, afterAll } from "vitest";
-import { getDuration, renderDuration } from "./datetimeUtils";
+import { getDuration, renderDuration, getRelativeTime } from "./datetimeUtils";
describe("getDuration", () => {
it("handles durations less than 60 seconds", () => {
@@ -56,3 +56,32 @@ describe("getDuration", () => {
expect(renderDuration(0.000_01)).toBe(undefined);
});
});
+
+describe("getRelativeTime", () => {
+ const fixedNow = new Date("2024-03-14T10:00:10.000Z");
+
+ beforeAll(() => {
+ vi.useFakeTimers();
+ vi.setSystemTime(fixedNow);
+ });
+
+ afterAll(() => {
+ vi.useRealTimers();
+ });
+
+ it("returns relative time for a valid date", () => {
+ const date = "2024-03-14T10:00:00.000Z";
+
+ expect(getRelativeTime(date)).toBe("a few seconds ago");
+ });
+
+ it("returns an empty string for undefined dates", () => {
+ expect(getRelativeTime(undefined)).toBe("");
+ });
+
+ it("handles future dates", () => {
+ const futureDate = "2024-03-14T10:00:20.000Z";
+
+ expect(getRelativeTime(futureDate)).toBe("in a few seconds");
+ });
+});
diff --git a/airflow-core/src/airflow/ui/src/utils/datetimeUtils.ts b/airflow-core/src/airflow/ui/src/utils/datetimeUtils.ts
index abd598e3debce..dc4eea6731fb7 100644
--- a/airflow-core/src/airflow/ui/src/utils/datetimeUtils.ts
+++ b/airflow-core/src/airflow/ui/src/utils/datetimeUtils.ts
@@ -19,8 +19,10 @@
import dayjs from "dayjs";
import dayjsDuration from "dayjs/plugin/duration";
import tz from "dayjs/plugin/timezone";
+import relativeTime from "dayjs/plugin/relativeTime";
dayjs.extend(dayjsDuration);
+dayjs.extend(relativeTime);
dayjs.extend(tz);
export const DEFAULT_DATETIME_FORMAT = "YYYY-MM-DD HH:mm:ss";
@@ -59,3 +61,11 @@ export const formatDate = (
return dayjs(date).tz(timezone).format(format);
};
+
+export const getRelativeTime = (date: string | null | undefined): string => {
+ if (date === null || date === "" || date === undefined) {
+ return "";
+ }
+
+ return dayjs(date).fromNow();
+};
diff --git a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py
index f780ad74ddf81..2edb67efe97be 100644
--- a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py
+++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py
@@ -2903,7 +2903,9 @@ def test_clear_taskinstance_is_called_with_queued_dr_state(self, mock_clearti, t
# dag (3rd argument) is a different session object. Manually asserting that the dag_id
# is the same.
- mock_clearti.assert_called_once_with([], mock.ANY, DagRunState.QUEUED, run_on_latest_version=False)
+ mock_clearti.assert_called_once_with(
+ [], mock.ANY, DagRunState.QUEUED, prevent_running_task=False, run_on_latest_version=False
+ )
def test_clear_taskinstance_is_called_with_invalid_task_ids(self, test_client, session):
"""Test that dagrun is running when invalid task_ids are passed to clearTaskInstances API."""
diff --git a/airflow-ctl/src/airflowctl/api/datamodels/generated.py b/airflow-ctl/src/airflowctl/api/datamodels/generated.py
index 2bf729114b85b..814ccab3e210f 100644
--- a/airflow-ctl/src/airflowctl/api/datamodels/generated.py
+++ b/airflow-ctl/src/airflowctl/api/datamodels/generated.py
@@ -172,6 +172,7 @@ class ClearTaskInstancesBody(BaseModel):
title="Run On Latest Version",
),
] = False
+ prevent_running_task: Annotated[bool | None, Field(title="Prevent Running Task")] = False
class Value(RootModel[list]):