From 1109cad196d3e173dc9473401cb7f890e9622f3b Mon Sep 17 00:00:00 2001 From: Ephraim Anierobi Date: Tue, 24 Jun 2025 18:25:36 +0100 Subject: [PATCH 01/14] Add run_on_latest_version support for backfill and clear operations With this option, users are able to choose the dag version they want to run their dag/task after clearing or when running backfill. This only applies to versioned bundles as non-versioned bundles run with the latest dag version. When the user choose the run with latest version, the bundle_version associated with the dagrun is updated to the latest and the associated serialized dag version updated to the latest. Choosing not to run with latest version which is the default means that the bundle version and serialized dag version that the dag ran with initially would be used in running it again. For backfill, there's now --run-on-latest-version flag that makes it run with the latest version, otherwise it will run with the original bundle the dagrun was created with. Note that it's only useful when rerunning a dagrun using backfill. The default behaviour is using the initial bundle/version and this is intentional otherwise running backfill will fail if there was task rename in the latest version. Summary of changes: - Use SchedulerDagBag instead of DagBag for execution API - Add run_on_latest_version field to DAGRunClearBody and ClearTaskInstancesBody models - Add --run-on-latest-version CLI flag for backfill command - Update backfill.py to support running tasks with latest DAG version - Add UI checkbox for "Run with latest version" in clear dialogs - Update SchedulerDagBag to handle latest version parameter - Update API endpoints to support run_on_latest_version parameter --- airflow-core/src/airflow/api_fastapi/app.py | 4 ++- .../core_api/datamodels/dag_run.py | 1 + .../core_api/datamodels/task_instances.py | 1 + .../openapi/v2-rest-api-generated.yaml | 8 ++++++ .../core_api/routes/public/dag_run.py | 2 ++ .../core_api/routes/public/task_instances.py | 5 ++++ .../execution_api/routes/dag_runs.py | 16 ++++++++--- .../execution_api/routes/task_instances.py | 25 ++++++++++------- airflow-core/src/airflow/cli/cli_config.py | 9 +++++++ .../airflow/cli/commands/backfill_command.py | 2 ++ .../src/airflow/jobs/scheduler_job_runner.py | 15 ++++++----- airflow-core/src/airflow/models/backfill.py | 7 ++++- airflow-core/src/airflow/models/dag.py | 7 +++++ .../src/airflow/models/taskinstance.py | 10 ++++--- .../ui/openapi-gen/requests/schemas.gen.ts | 10 +++++++ .../ui/openapi-gen/requests/types.gen.ts | 2 ++ .../ui/public/i18n/locales/en/dags.json | 1 + .../components/Clear/Run/ClearRunDialog.tsx | 19 ++++++++++--- .../TaskInstance/ClearTaskInstanceDialog.tsx | 13 +++++++-- .../src/airflow/ui/src/components/ui/index.ts | 1 + .../routes/public/test_task_instances.py | 2 +- .../v2025_04_28/test_task_instances.py | 7 +++-- .../cli/commands/test_backfill_command.py | 27 +++++++++++++++++++ .../tests/unit/models/test_backfill.py | 17 +++++++++--- .../airflowctl/api/datamodels/generated.py | 2 ++ .../airflow/sdk/execution_time/supervisor.py | 9 +++---- 26 files changed, 176 insertions(+), 46 deletions(-) diff --git a/airflow-core/src/airflow/api_fastapi/app.py b/airflow-core/src/airflow/api_fastapi/app.py index f515be6992c60..17de4f677cd14 100644 --- a/airflow-core/src/airflow/api_fastapi/app.py +++ b/airflow-core/src/airflow/api_fastapi/app.py @@ -84,8 +84,10 @@ def create_app(apps: str = "all") -> FastAPI: dag_bag = create_dag_bag() if "execution" in apps_list or "all" in apps_list: + from airflow.jobs.scheduler_job_runner import SchedulerDagBag + task_exec_api_app = create_task_execution_api_app() - task_exec_api_app.state.dag_bag = dag_bag + task_exec_api_app.state.dag_bag = SchedulerDagBag() init_error_handlers(task_exec_api_app) app.mount("/execution", task_exec_api_app) diff --git a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/dag_run.py b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/dag_run.py index e184a3acfe864..ae0b1b21cab4c 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/dag_run.py +++ b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/dag_run.py @@ -55,6 +55,7 @@ class DAGRunClearBody(StrictBaseModel): dry_run: bool = True only_failed: bool = False + run_on_latest_version: bool = False class DAGRunResponse(BaseModel): diff --git a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/task_instances.py b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/task_instances.py index e2470119a1c80..e3f1516a0b397 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/task_instances.py +++ b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/task_instances.py @@ -179,6 +179,7 @@ class ClearTaskInstancesBody(StrictBaseModel): include_downstream: bool = False include_future: bool = False include_past: bool = False + run_on_latest_version: bool = False @model_validator(mode="before") @classmethod diff --git a/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml b/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml index f2fb07734e9e4..ea1cdeb033ad0 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml +++ b/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml @@ -8396,6 +8396,10 @@ components: type: boolean title: Include Past default: false + run_on_latest_version: + type: boolean + title: Run On Latest Version + default: false additionalProperties: false type: object title: ClearTaskInstancesBody @@ -9049,6 +9053,10 @@ components: type: boolean title: Only Failed default: false + run_on_latest_version: + type: boolean + title: Run On Latest Version + default: false additionalProperties: false type: object title: DAGRunClearBody diff --git a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py index 5d974ec012cd5..00f93754baf2d 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py +++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py @@ -281,6 +281,7 @@ def clear_dag_run( run_id=dag_run_id, task_ids=None, only_failed=body.only_failed, + run_on_latest_version=body.run_on_latest_version, dry_run=True, session=session, ) @@ -293,6 +294,7 @@ def clear_dag_run( run_id=dag_run_id, task_ids=None, only_failed=body.only_failed, + run_on_latest_version=body.run_on_latest_version, session=session, ) dag_run_cleared = session.scalar(select(DagRun).where(DagRun.id == dag_run.id)) diff --git a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/task_instances.py b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/task_instances.py index 915054cdf792e..33bafc0506b7e 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/task_instances.py +++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/task_instances.py @@ -675,7 +675,11 @@ def post_clear_task_instances( if dag_run is None: error_message = f"Dag Run id {dag_run_id} not found in dag {dag_id}" raise HTTPException(status.HTTP_404_NOT_FOUND, error_message) + # If dag_run_id is provided, we should get the dag from SchedulerDagBag + # to ensure we get the right version. + from airflow.jobs.scheduler_job_runner import SchedulerDagBag + dag = SchedulerDagBag().get_dag(dag_run=dag_run, session=session) if past or future: raise HTTPException( status.HTTP_400_BAD_REQUEST, @@ -724,6 +728,7 @@ def post_clear_task_instances( task_instances, session, DagRunState.QUEUED if reset_dag_runs else False, + run_on_latest_version=body.run_on_latest_version, ) return TaskInstanceCollectionResponse( diff --git a/airflow-core/src/airflow/api_fastapi/execution_api/routes/dag_runs.py b/airflow-core/src/airflow/api_fastapi/execution_api/routes/dag_runs.py index db27f7bd93ed8..04a3e2ce7ecab 100644 --- a/airflow-core/src/airflow/api_fastapi/execution_api/routes/dag_runs.py +++ b/airflow-core/src/airflow/api_fastapi/execution_api/routes/dag_runs.py @@ -29,7 +29,6 @@ from airflow.api_fastapi.execution_api.datamodels.dagrun import DagRunStateResponse, TriggerDAGRunPayload from airflow.exceptions import DagRunAlreadyExists from airflow.models.dag import DagModel -from airflow.models.dagbag import DagBag from airflow.models.dagrun import DagRun from airflow.utils.types import DagRunTriggeredByType @@ -122,9 +121,20 @@ def clear_dag_run( "message": f"DAG with dag_id: '{dag_id}' has import errors and cannot be triggered", }, ) + from airflow.jobs.scheduler_job_runner import SchedulerDagBag + + dag_run = session.scalar(select(DagRun).where(DagRun.dag_id == dag_id, DagRun.run_id == run_id)) + dag_bag = SchedulerDagBag() + dag = dag_bag.get_dag(dag_run=dag_run, session=session) + if not dag: + raise HTTPException( + status.HTTP_404_NOT_FOUND, + detail={ + "reason": "Not Found", + "message": f"DAG with dag_id: '{dag_id}' was not found in the DagBag", + }, + ) - dag_bag = DagBag(dag_folder=dm.fileloc, read_dags_from_db=True) - dag = dag_bag.get_dag(dag_id) dag.clear(run_id=run_id) diff --git a/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py b/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py index eef96323d2b7d..def4dd155c6c7 100644 --- a/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py +++ b/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py @@ -29,7 +29,7 @@ import attrs import structlog from cadwyn import VersionedAPIRouter -from fastapi import Body, HTTPException, Query, status +from fastapi import Body, Depends, HTTPException, Query, status from pydantic import JsonValue from sqlalchemy import func, or_, tuple_, update from sqlalchemy.exc import NoResultFound, SQLAlchemyError @@ -37,7 +37,7 @@ from sqlalchemy.sql import select from structlog.contextvars import bind_contextvars -from airflow.api_fastapi.common.dagbag import DagBagDep +from airflow.api_fastapi.common.dagbag import dag_bag_from_app from airflow.api_fastapi.common.db.common import SessionDep from airflow.api_fastapi.common.types import UtcDateTime from airflow.api_fastapi.execution_api.datamodels.taskinstance import ( @@ -76,6 +76,9 @@ from airflow.models.expandinput import SchedulerExpandInput from airflow.sdk.types import Operator +from airflow.jobs.scheduler_job_runner import SchedulerDagBag + +SchedulerDagBagDep = Annotated[SchedulerDagBag, Depends(dag_bag_from_app)] router = VersionedAPIRouter() @@ -104,7 +107,7 @@ def ti_run( task_instance_id: UUID, ti_run_payload: Annotated[TIEnterRunningPayload, Body()], session: SessionDep, - dag_bag: DagBagDep, + dag_bag: SchedulerDagBagDep, ) -> TIRunContext: """ Run a TaskInstance. @@ -255,7 +258,7 @@ def ti_run( or 0 ) - if dag := dag_bag.get_dag(ti.dag_id): + if dag := dag_bag.get_dag(dag_run=dr, session=session): upstream_map_indexes = dict( _get_upstream_map_indexes(dag.get_task(ti.task_id), ti.map_index, ti.run_id, session) ) @@ -330,7 +333,7 @@ def ti_update_state( task_instance_id: UUID, ti_patch_payload: Annotated[TIStateUpdate, Body()], session: SessionDep, - dag_bag: DagBagDep, + dag_bag: SchedulerDagBagDep, ): """ Update the state of a TaskInstance. @@ -417,8 +420,9 @@ def ti_update_state( ) -def _handle_fail_fast_for_dag(ti: TI, dag_id: str, session: SessionDep, dag_bag: DagBagDep) -> None: - ser_dag = dag_bag.get_dag(dag_id) +def _handle_fail_fast_for_dag(ti: TI, dag_id: str, session: SessionDep, dag_bag: SchedulerDagBagDep) -> None: + dr = ti.dag_run + ser_dag = dag_bag.get_dag(dag_run=dr, session=session) if ser_dag and getattr(ser_dag, "fail_fast", False): task_dict = getattr(ser_dag, "task_dict") task_teardown_map = {k: v.is_teardown for k, v in task_dict.items()} @@ -432,7 +436,7 @@ def _create_ti_state_update_query_and_update_state( query: Update, updated_state, session: SessionDep, - dag_bag: DagBagDep, + dag_bag: SchedulerDagBagDep, dag_id: str, ) -> tuple[Update, TaskInstanceState]: if isinstance(ti_patch_payload, (TITerminalStatePayload, TIRetryStatePayload, TISuccessStatePayload)): @@ -893,7 +897,7 @@ def _get_group_tasks(dag_id: str, task_group_id: str, session: SessionDep, logic def validate_inlets_and_outlets( task_instance_id: UUID, session: SessionDep, - dag_bag: DagBagDep, + dag_bag: SchedulerDagBagDep, ) -> InactiveAssetsResponse: """Validate whether there're inactive assets in inlets and outlets of a given task instance.""" ti_id_str = str(task_instance_id) @@ -911,7 +915,8 @@ def validate_inlets_and_outlets( ) if not ti.task: - dag = dag_bag.get_dag(ti.dag_id) + dr = ti.dag_run + dag = dag_bag.get_dag(dag_run=dr, session=session) if dag: with contextlib.suppress(TaskNotFound): ti.task = dag.get_task(ti.task_id) diff --git a/airflow-core/src/airflow/cli/cli_config.py b/airflow-core/src/airflow/cli/cli_config.py index 363839a85808c..0117d07e1d76b 100644 --- a/airflow-core/src/airflow/cli/cli_config.py +++ b/airflow-core/src/airflow/cli/cli_config.py @@ -340,6 +340,14 @@ def string_lower_type(val): ), choices=("none", "completed", "failed"), ) +ARG_BACKFILL_RUN_ON_LATEST_VERSION = Arg( + ("--run-on-latest-version",), + help=( + "If set, the backfill will run tasks using the latest DAG version instead of " + "the version that was active when the original DAG run was created." + ), + action="store_true", +) # misc @@ -968,6 +976,7 @@ class GroupCommand(NamedTuple): ARG_RUN_BACKWARDS, ARG_MAX_ACTIVE_RUNS, ARG_BACKFILL_REPROCESS_BEHAVIOR, + ARG_BACKFILL_RUN_ON_LATEST_VERSION, ARG_BACKFILL_DRY_RUN, ), ), diff --git a/airflow-core/src/airflow/cli/commands/backfill_command.py b/airflow-core/src/airflow/cli/commands/backfill_command.py index 19108dc4e5412..444bc35aea34a 100644 --- a/airflow-core/src/airflow/cli/commands/backfill_command.py +++ b/airflow-core/src/airflow/cli/commands/backfill_command.py @@ -57,6 +57,7 @@ def create_backfill(args) -> None: reverse=args.run_backwards, dag_run_conf=args.dag_run_conf, reprocess_behavior=reprocess_behavior, + run_on_latest_version=args.run_on_latest_version, ) for k, v in params.items(): console.print(f" - {k} = {v}") @@ -88,4 +89,5 @@ def create_backfill(args) -> None: dag_run_conf=args.dag_run_conf, triggering_user_name=user, reprocess_behavior=reprocess_behavior, + run_on_latest_version=args.run_on_latest_version, ) diff --git a/airflow-core/src/airflow/jobs/scheduler_job_runner.py b/airflow-core/src/airflow/jobs/scheduler_job_runner.py index 4eb99616133f7..1157c3b51f8df 100644 --- a/airflow-core/src/airflow/jobs/scheduler_job_runner.py +++ b/airflow-core/src/airflow/jobs/scheduler_job_runner.py @@ -131,15 +131,16 @@ def _get_dag(self, version_id: str, session: Session) -> DAG | None: return dag @staticmethod - def _version_from_dag_run(dag_run, session): - if dag_run.bundle_version: - dag_version = dag_run.created_dag_version - else: + def _version_from_dag_run(dag_run, latest, session): + if latest or not dag_run.bundle_version: dag_version = DagVersion.get_latest_version(dag_id=dag_run.dag_id, session=session) - return dag_version + if dag_version: + return dag_version + + return dag_run.created_dag_version - def get_dag(self, dag_run: DagRun, session: Session) -> DAG | None: - version = self._version_from_dag_run(dag_run=dag_run, session=session) + def get_dag(self, dag_run: DagRun, session: Session, latest=False) -> DAG | None: + version = self._version_from_dag_run(dag_run=dag_run, latest=latest, session=session) if not version: return None return self._get_dag(version_id=version.id, session=session) diff --git a/airflow-core/src/airflow/models/backfill.py b/airflow-core/src/airflow/models/backfill.py index e8eb3f8b5cd0f..e1fd0e506bfbc 100644 --- a/airflow-core/src/airflow/models/backfill.py +++ b/airflow-core/src/airflow/models/backfill.py @@ -291,6 +291,7 @@ def _create_backfill_dag_run( dag_run_conf, backfill_sort_ordinal, triggering_user_name, + run_on_latest_version, session, ): from airflow.models.dagrun import DagRun @@ -328,6 +329,7 @@ def _create_backfill_dag_run( info=info, backfill_id=backfill_id, sort_ordinal=backfill_sort_ordinal, + run_on_latest=run_on_latest_version, ) else: session.add( @@ -401,7 +403,7 @@ def _get_info_list( return dagrun_info_list -def _handle_clear_run(session, dag, dr, info, backfill_id, sort_ordinal): +def _handle_clear_run(session, dag, dr, info, backfill_id, sort_ordinal, run_on_latest=False): """Clear the existing DAG run and update backfill metadata.""" from sqlalchemy.sql import update @@ -415,6 +417,7 @@ def _handle_clear_run(session, dag, dr, info, backfill_id, sort_ordinal): session=session, confirm_prompt=False, dry_run=False, + run_on_latest_version=run_on_latest, ) # Update backfill_id and run_type in DagRun table @@ -447,6 +450,7 @@ def _create_backfill( dag_run_conf: dict | None, triggering_user_name: str | None, reprocess_behavior: ReprocessBehavior | None = None, + run_on_latest_version: bool = False, ) -> Backfill | None: from airflow.models import DagModel from airflow.models.serialized_dag import SerializedDagModel @@ -510,6 +514,7 @@ def _create_backfill( reprocess_behavior=br.reprocess_behavior, backfill_sort_ordinal=backfill_sort_ordinal, triggering_user_name=br.triggering_user_name, + run_on_latest_version=run_on_latest_version, session=session, ) log.info( diff --git a/airflow-core/src/airflow/models/dag.py b/airflow-core/src/airflow/models/dag.py index a141d96a1c30f..43fc62b098d73 100644 --- a/airflow-core/src/airflow/models/dag.py +++ b/airflow-core/src/airflow/models/dag.py @@ -1282,6 +1282,7 @@ def clear( only_running: bool = False, confirm_prompt: bool = False, dag_run_state: DagRunState = DagRunState.QUEUED, + run_on_latest_version: bool = False, session: Session = NEW_SESSION, dag_bag: DagBag | None = None, exclude_task_ids: frozenset[str] | frozenset[tuple[str, int]] | None = frozenset(), @@ -1299,6 +1300,7 @@ def clear( confirm_prompt: bool = False, dag_run_state: DagRunState = DagRunState.QUEUED, dry_run: Literal[False] = False, + run_on_latest_version: bool = False, session: Session = NEW_SESSION, dag_bag: DagBag | None = None, exclude_task_ids: frozenset[str] | frozenset[tuple[str, int]] | None = frozenset(), @@ -1317,6 +1319,7 @@ def clear( only_running: bool = False, confirm_prompt: bool = False, dag_run_state: DagRunState = DagRunState.QUEUED, + run_on_latest_version: bool = False, session: Session = NEW_SESSION, dag_bag: DagBag | None = None, exclude_task_ids: frozenset[str] | frozenset[tuple[str, int]] | None = frozenset(), @@ -1335,6 +1338,7 @@ def clear( confirm_prompt: bool = False, dag_run_state: DagRunState = DagRunState.QUEUED, dry_run: Literal[False] = False, + run_on_latest_version: bool = False, session: Session = NEW_SESSION, dag_bag: DagBag | None = None, exclude_task_ids: frozenset[str] | frozenset[tuple[str, int]] | None = frozenset(), @@ -1354,6 +1358,7 @@ def clear( confirm_prompt: bool = False, dag_run_state: DagRunState = DagRunState.QUEUED, dry_run: bool = False, + run_on_latest_version: bool = False, session: Session = NEW_SESSION, dag_bag: DagBag | None = None, exclude_task_ids: frozenset[str] | frozenset[tuple[str, int]] | None = frozenset(), @@ -1372,6 +1377,7 @@ def clear( :param dag_run_state: state to set DagRun to. If set to False, dagrun state will not be changed. :param dry_run: Find the tasks to clear but don't clear them. + :param run_on_latest_version: whether to run on latest serialized DAG and Bundle version :param session: The sqlalchemy session to use :param dag_bag: The DagBag used to find the dags (Optional) :param exclude_task_ids: A set of ``task_id`` or (``task_id``, ``map_index``) @@ -1417,6 +1423,7 @@ def clear( list(tis), session, dag_run_state=dag_run_state, + run_on_latest_version=run_on_latest_version, ) else: count = 0 diff --git a/airflow-core/src/airflow/models/taskinstance.py b/airflow-core/src/airflow/models/taskinstance.py index f4ed5573e5993..ca46b4802567a 100644 --- a/airflow-core/src/airflow/models/taskinstance.py +++ b/airflow-core/src/airflow/models/taskinstance.py @@ -203,6 +203,7 @@ def clear_task_instances( tis: list[TaskInstance], session: Session, dag_run_state: DagRunState | Literal[False] = DagRunState.QUEUED, + run_on_latest_version: bool = False, ) -> None: """ Clear a set of task instances, but make sure the running ones get killed. @@ -217,6 +218,7 @@ def clear_task_instances( :param session: current session :param dag_run_state: state to set finished DagRuns to. If set to False, DagRuns state will not be changed. + :param run_on_latest_version: whether to run on latest serialized DAG and Bundle version :meta private: """ @@ -234,7 +236,7 @@ def clear_task_instances( ti.state = TaskInstanceState.RESTARTING else: dr = ti.dag_run - ti_dag = scheduler_dagbag.get_dag(dag_run=dr, session=session) + ti_dag = scheduler_dagbag.get_dag(dag_run=dr, session=session, latest=run_on_latest_version) if not ti_dag: log.warning("No serialized dag found for dag '%s'", dr.dag_id) task_id = ti.task_id @@ -277,12 +279,12 @@ def clear_task_instances( if dr.state in State.finished_dr_states: dr.state = dag_run_state dr.start_date = timezone.utcnow() - dr_dag = scheduler_dagbag.get_dag(dag_run=dr, session=session) + dr_dag = scheduler_dagbag.get_dag(dag_run=dr, session=session, latest=run_on_latest_version) if not dr_dag: log.warning("No serialized dag found for dag '%s'", dr.dag_id) - if dr_dag and not dr_dag.disable_bundle_versioning: + if dr_dag and not dr_dag.disable_bundle_versioning and run_on_latest_version: bundle_version = dr.dag_model.bundle_version - if bundle_version is not None: + if bundle_version is not None and run_on_latest_version: dr.bundle_version = bundle_version if dag_run_state == DagRunState.QUEUED: dr.last_scheduling_decision = None diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts b/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts index a26e0d4960182..0ed9e31da5610 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts @@ -1205,6 +1205,11 @@ export const $ClearTaskInstancesBody = { type: 'boolean', title: 'Include Past', default: false + }, + run_on_latest_version: { + type: 'boolean', + title: 'Run On Latest Version', + default: false } }, additionalProperties: false, @@ -2205,6 +2210,11 @@ export const $DAGRunClearBody = { type: 'boolean', title: 'Only Failed', default: false + }, + run_on_latest_version: { + type: 'boolean', + title: 'Run On Latest Version', + default: false } }, additionalProperties: false, diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts b/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts index 17e7de16780f7..8b7530ea860d5 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts @@ -389,6 +389,7 @@ export type ClearTaskInstancesBody = { include_downstream?: boolean; include_future?: boolean; include_past?: boolean; + run_on_latest_version?: boolean; }; /** @@ -594,6 +595,7 @@ export type DAGResponse = { export type DAGRunClearBody = { dry_run?: boolean; only_failed?: boolean; + run_on_latest_version?: boolean; }; /** diff --git a/airflow-core/src/airflow/ui/public/i18n/locales/en/dags.json b/airflow-core/src/airflow/ui/public/i18n/locales/en/dags.json index d1ee6a3d0eeb7..196e9016830ba 100644 --- a/airflow-core/src/airflow/ui/public/i18n/locales/en/dags.json +++ b/airflow-core/src/airflow/ui/public/i18n/locales/en/dags.json @@ -62,6 +62,7 @@ "onlyFailed": "Clear only failed tasks", "past": "Past", "queueNew": "Queue up new tasks", + "runOnLatestVersion": "Run with latest version", "upstream": "Upstream" } }, diff --git a/airflow-core/src/airflow/ui/src/components/Clear/Run/ClearRunDialog.tsx b/airflow-core/src/airflow/ui/src/components/Clear/Run/ClearRunDialog.tsx index 696ada0ee3815..0383fb09b4957 100644 --- a/airflow-core/src/airflow/ui/src/components/Clear/Run/ClearRunDialog.tsx +++ b/airflow-core/src/airflow/ui/src/components/Clear/Run/ClearRunDialog.tsx @@ -23,7 +23,7 @@ import { CgRedo } from "react-icons/cg"; import type { DAGRunResponse } from "openapi/requests/types.gen"; import { ActionAccordion } from "src/components/ActionAccordion"; -import { Button, Dialog } from "src/components/ui"; +import { Button, Dialog, Checkbox } from "src/components/ui"; import SegmentedControl from "src/components/ui/SegmentedControl"; import { useClearDagRunDryRun } from "src/queries/useClearDagRunDryRun"; import { useClearDagRun } from "src/queries/useClearRun"; @@ -43,11 +43,12 @@ const ClearRunDialog = ({ dagRun, onClose, open }: Props) => { const [note, setNote] = useState(dagRun.note); const [selectedOptions, setSelectedOptions] = useState>(["existingTasks"]); const onlyFailed = selectedOptions.includes("onlyFailed"); + const [runOnLatestVersion, setRunOnLatestVersion] = useState(false); const { data: affectedTasks = { task_instances: [], total_entries: 0 } } = useClearDagRunDryRun({ dagId, dagRunId, - requestBody: { only_failed: onlyFailed }, + requestBody: { only_failed: onlyFailed, run_on_latest_version: runOnLatestVersion }, }); const { isPending, mutate } = useClearDagRun({ @@ -101,7 +102,13 @@ const ClearRunDialog = ({ dagRun, onClose, open }: Props) => { /> - + + setRunOnLatestVersion(Boolean(event.checked))} + > + {translate("dags:runAndTaskActions.options.runOnLatestVersion")} +