diff --git a/airflow-core/docs/core-concepts/dag-run.rst b/airflow-core/docs/core-concepts/dag-run.rst index af7e727de2b33..1cb121c9b721b 100644 --- a/airflow-core/docs/core-concepts/dag-run.rst +++ b/airflow-core/docs/core-concepts/dag-run.rst @@ -167,6 +167,8 @@ the errors after going through the logs, you can re-run the tasks by clearing th scheduled date. Clearing a task instance creates a record of the task instance. The ``try_number`` of the current task instance is incremented, the ``max_tries`` set to ``0`` and the state set to ``None``, which causes the task to re-run. +An experimental feature in Airflow 3.1.0 allows you to clear the task instances and re-run with the latest bundle version. + Click on the failed task in the Tree or Graph views and then click on **Clear**. The executor will re-run it. diff --git a/airflow-core/src/airflow/api_fastapi/app.py b/airflow-core/src/airflow/api_fastapi/app.py index f515be6992c60..17de4f677cd14 100644 --- a/airflow-core/src/airflow/api_fastapi/app.py +++ b/airflow-core/src/airflow/api_fastapi/app.py @@ -84,8 +84,10 @@ def create_app(apps: str = "all") -> FastAPI: dag_bag = create_dag_bag() if "execution" in apps_list or "all" in apps_list: + from airflow.jobs.scheduler_job_runner import SchedulerDagBag + task_exec_api_app = create_task_execution_api_app() - task_exec_api_app.state.dag_bag = dag_bag + task_exec_api_app.state.dag_bag = SchedulerDagBag() init_error_handlers(task_exec_api_app) app.mount("/execution", task_exec_api_app) diff --git a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/dag_run.py b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/dag_run.py index e184a3acfe864..bb79fa671ed2f 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/dag_run.py +++ b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/dag_run.py @@ -55,6 +55,10 @@ class DAGRunClearBody(StrictBaseModel): dry_run: bool = True only_failed: bool = False + run_on_latest_version: bool = Field( + default=False, + description="(Experimental) Run on the latest bundle version of the DAG after clearing the DAG Run.", + ) class DAGRunResponse(BaseModel): diff --git a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/task_instances.py b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/task_instances.py index e2470119a1c80..bb04e829652a0 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/task_instances.py +++ b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/task_instances.py @@ -179,6 +179,11 @@ class ClearTaskInstancesBody(StrictBaseModel): include_downstream: bool = False include_future: bool = False include_past: bool = False + run_on_latest_version: bool = Field( + default=False, + description="(Experimental) Run on the latest bundle version of the DAG after " + "clearing the task instances.", + ) @model_validator(mode="before") @classmethod diff --git a/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml b/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml index f2fb07734e9e4..483c7688f9287 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml +++ b/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml @@ -8396,6 +8396,12 @@ components: type: boolean title: Include Past default: false + run_on_latest_version: + type: boolean + title: Run On Latest Version + description: (Experimental) Run on the latest bundle version of the DAG + after clearing the task instances. + default: false additionalProperties: false type: object title: ClearTaskInstancesBody @@ -9049,6 +9055,12 @@ components: type: boolean title: Only Failed default: false + run_on_latest_version: + type: boolean + title: Run On Latest Version + description: (Experimental) Run on the latest bundle version of the DAG + after clearing the DAG Run. + default: false additionalProperties: false type: object title: DAGRunClearBody diff --git a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py index 5d974ec012cd5..00f93754baf2d 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py +++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py @@ -281,6 +281,7 @@ def clear_dag_run( run_id=dag_run_id, task_ids=None, only_failed=body.only_failed, + run_on_latest_version=body.run_on_latest_version, dry_run=True, session=session, ) @@ -293,6 +294,7 @@ def clear_dag_run( run_id=dag_run_id, task_ids=None, only_failed=body.only_failed, + run_on_latest_version=body.run_on_latest_version, session=session, ) dag_run_cleared = session.scalar(select(DagRun).where(DagRun.id == dag_run.id)) diff --git a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/task_instances.py b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/task_instances.py index 915054cdf792e..33bafc0506b7e 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/task_instances.py +++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/task_instances.py @@ -675,7 +675,11 @@ def post_clear_task_instances( if dag_run is None: error_message = f"Dag Run id {dag_run_id} not found in dag {dag_id}" raise HTTPException(status.HTTP_404_NOT_FOUND, error_message) + # If dag_run_id is provided, we should get the dag from SchedulerDagBag + # to ensure we get the right version. + from airflow.jobs.scheduler_job_runner import SchedulerDagBag + dag = SchedulerDagBag().get_dag(dag_run=dag_run, session=session) if past or future: raise HTTPException( status.HTTP_400_BAD_REQUEST, @@ -724,6 +728,7 @@ def post_clear_task_instances( task_instances, session, DagRunState.QUEUED if reset_dag_runs else False, + run_on_latest_version=body.run_on_latest_version, ) return TaskInstanceCollectionResponse( diff --git a/airflow-core/src/airflow/api_fastapi/execution_api/routes/dag_runs.py b/airflow-core/src/airflow/api_fastapi/execution_api/routes/dag_runs.py index db27f7bd93ed8..04a3e2ce7ecab 100644 --- a/airflow-core/src/airflow/api_fastapi/execution_api/routes/dag_runs.py +++ b/airflow-core/src/airflow/api_fastapi/execution_api/routes/dag_runs.py @@ -29,7 +29,6 @@ from airflow.api_fastapi.execution_api.datamodels.dagrun import DagRunStateResponse, TriggerDAGRunPayload from airflow.exceptions import DagRunAlreadyExists from airflow.models.dag import DagModel -from airflow.models.dagbag import DagBag from airflow.models.dagrun import DagRun from airflow.utils.types import DagRunTriggeredByType @@ -122,9 +121,20 @@ def clear_dag_run( "message": f"DAG with dag_id: '{dag_id}' has import errors and cannot be triggered", }, ) + from airflow.jobs.scheduler_job_runner import SchedulerDagBag + + dag_run = session.scalar(select(DagRun).where(DagRun.dag_id == dag_id, DagRun.run_id == run_id)) + dag_bag = SchedulerDagBag() + dag = dag_bag.get_dag(dag_run=dag_run, session=session) + if not dag: + raise HTTPException( + status.HTTP_404_NOT_FOUND, + detail={ + "reason": "Not Found", + "message": f"DAG with dag_id: '{dag_id}' was not found in the DagBag", + }, + ) - dag_bag = DagBag(dag_folder=dm.fileloc, read_dags_from_db=True) - dag = dag_bag.get_dag(dag_id) dag.clear(run_id=run_id) diff --git a/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py b/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py index eef96323d2b7d..def4dd155c6c7 100644 --- a/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py +++ b/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py @@ -29,7 +29,7 @@ import attrs import structlog from cadwyn import VersionedAPIRouter -from fastapi import Body, HTTPException, Query, status +from fastapi import Body, Depends, HTTPException, Query, status from pydantic import JsonValue from sqlalchemy import func, or_, tuple_, update from sqlalchemy.exc import NoResultFound, SQLAlchemyError @@ -37,7 +37,7 @@ from sqlalchemy.sql import select from structlog.contextvars import bind_contextvars -from airflow.api_fastapi.common.dagbag import DagBagDep +from airflow.api_fastapi.common.dagbag import dag_bag_from_app from airflow.api_fastapi.common.db.common import SessionDep from airflow.api_fastapi.common.types import UtcDateTime from airflow.api_fastapi.execution_api.datamodels.taskinstance import ( @@ -76,6 +76,9 @@ from airflow.models.expandinput import SchedulerExpandInput from airflow.sdk.types import Operator +from airflow.jobs.scheduler_job_runner import SchedulerDagBag + +SchedulerDagBagDep = Annotated[SchedulerDagBag, Depends(dag_bag_from_app)] router = VersionedAPIRouter() @@ -104,7 +107,7 @@ def ti_run( task_instance_id: UUID, ti_run_payload: Annotated[TIEnterRunningPayload, Body()], session: SessionDep, - dag_bag: DagBagDep, + dag_bag: SchedulerDagBagDep, ) -> TIRunContext: """ Run a TaskInstance. @@ -255,7 +258,7 @@ def ti_run( or 0 ) - if dag := dag_bag.get_dag(ti.dag_id): + if dag := dag_bag.get_dag(dag_run=dr, session=session): upstream_map_indexes = dict( _get_upstream_map_indexes(dag.get_task(ti.task_id), ti.map_index, ti.run_id, session) ) @@ -330,7 +333,7 @@ def ti_update_state( task_instance_id: UUID, ti_patch_payload: Annotated[TIStateUpdate, Body()], session: SessionDep, - dag_bag: DagBagDep, + dag_bag: SchedulerDagBagDep, ): """ Update the state of a TaskInstance. @@ -417,8 +420,9 @@ def ti_update_state( ) -def _handle_fail_fast_for_dag(ti: TI, dag_id: str, session: SessionDep, dag_bag: DagBagDep) -> None: - ser_dag = dag_bag.get_dag(dag_id) +def _handle_fail_fast_for_dag(ti: TI, dag_id: str, session: SessionDep, dag_bag: SchedulerDagBagDep) -> None: + dr = ti.dag_run + ser_dag = dag_bag.get_dag(dag_run=dr, session=session) if ser_dag and getattr(ser_dag, "fail_fast", False): task_dict = getattr(ser_dag, "task_dict") task_teardown_map = {k: v.is_teardown for k, v in task_dict.items()} @@ -432,7 +436,7 @@ def _create_ti_state_update_query_and_update_state( query: Update, updated_state, session: SessionDep, - dag_bag: DagBagDep, + dag_bag: SchedulerDagBagDep, dag_id: str, ) -> tuple[Update, TaskInstanceState]: if isinstance(ti_patch_payload, (TITerminalStatePayload, TIRetryStatePayload, TISuccessStatePayload)): @@ -893,7 +897,7 @@ def _get_group_tasks(dag_id: str, task_group_id: str, session: SessionDep, logic def validate_inlets_and_outlets( task_instance_id: UUID, session: SessionDep, - dag_bag: DagBagDep, + dag_bag: SchedulerDagBagDep, ) -> InactiveAssetsResponse: """Validate whether there're inactive assets in inlets and outlets of a given task instance.""" ti_id_str = str(task_instance_id) @@ -911,7 +915,8 @@ def validate_inlets_and_outlets( ) if not ti.task: - dag = dag_bag.get_dag(ti.dag_id) + dr = ti.dag_run + dag = dag_bag.get_dag(dag_run=dr, session=session) if dag: with contextlib.suppress(TaskNotFound): ti.task = dag.get_task(ti.task_id) diff --git a/airflow-core/src/airflow/cli/cli_config.py b/airflow-core/src/airflow/cli/cli_config.py index 363839a85808c..2006b5f42b959 100644 --- a/airflow-core/src/airflow/cli/cli_config.py +++ b/airflow-core/src/airflow/cli/cli_config.py @@ -340,6 +340,14 @@ def string_lower_type(val): ), choices=("none", "completed", "failed"), ) +ARG_BACKFILL_RUN_ON_LATEST_VERSION = Arg( + ("--run-on-latest-version",), + help=( + "(Experimental) If set, the backfill will run tasks using the latest bundle version instead of " + "the version that was active when the original Dag run was created." + ), + action="store_true", +) # misc @@ -968,6 +976,7 @@ class GroupCommand(NamedTuple): ARG_RUN_BACKWARDS, ARG_MAX_ACTIVE_RUNS, ARG_BACKFILL_REPROCESS_BEHAVIOR, + ARG_BACKFILL_RUN_ON_LATEST_VERSION, ARG_BACKFILL_DRY_RUN, ), ), diff --git a/airflow-core/src/airflow/cli/commands/backfill_command.py b/airflow-core/src/airflow/cli/commands/backfill_command.py index 19108dc4e5412..444bc35aea34a 100644 --- a/airflow-core/src/airflow/cli/commands/backfill_command.py +++ b/airflow-core/src/airflow/cli/commands/backfill_command.py @@ -57,6 +57,7 @@ def create_backfill(args) -> None: reverse=args.run_backwards, dag_run_conf=args.dag_run_conf, reprocess_behavior=reprocess_behavior, + run_on_latest_version=args.run_on_latest_version, ) for k, v in params.items(): console.print(f" - {k} = {v}") @@ -88,4 +89,5 @@ def create_backfill(args) -> None: dag_run_conf=args.dag_run_conf, triggering_user_name=user, reprocess_behavior=reprocess_behavior, + run_on_latest_version=args.run_on_latest_version, ) diff --git a/airflow-core/src/airflow/jobs/scheduler_job_runner.py b/airflow-core/src/airflow/jobs/scheduler_job_runner.py index 4eb99616133f7..1157c3b51f8df 100644 --- a/airflow-core/src/airflow/jobs/scheduler_job_runner.py +++ b/airflow-core/src/airflow/jobs/scheduler_job_runner.py @@ -131,15 +131,16 @@ def _get_dag(self, version_id: str, session: Session) -> DAG | None: return dag @staticmethod - def _version_from_dag_run(dag_run, session): - if dag_run.bundle_version: - dag_version = dag_run.created_dag_version - else: + def _version_from_dag_run(dag_run, latest, session): + if latest or not dag_run.bundle_version: dag_version = DagVersion.get_latest_version(dag_id=dag_run.dag_id, session=session) - return dag_version + if dag_version: + return dag_version + + return dag_run.created_dag_version - def get_dag(self, dag_run: DagRun, session: Session) -> DAG | None: - version = self._version_from_dag_run(dag_run=dag_run, session=session) + def get_dag(self, dag_run: DagRun, session: Session, latest=False) -> DAG | None: + version = self._version_from_dag_run(dag_run=dag_run, latest=latest, session=session) if not version: return None return self._get_dag(version_id=version.id, session=session) diff --git a/airflow-core/src/airflow/models/backfill.py b/airflow-core/src/airflow/models/backfill.py index e8eb3f8b5cd0f..e1fd0e506bfbc 100644 --- a/airflow-core/src/airflow/models/backfill.py +++ b/airflow-core/src/airflow/models/backfill.py @@ -291,6 +291,7 @@ def _create_backfill_dag_run( dag_run_conf, backfill_sort_ordinal, triggering_user_name, + run_on_latest_version, session, ): from airflow.models.dagrun import DagRun @@ -328,6 +329,7 @@ def _create_backfill_dag_run( info=info, backfill_id=backfill_id, sort_ordinal=backfill_sort_ordinal, + run_on_latest=run_on_latest_version, ) else: session.add( @@ -401,7 +403,7 @@ def _get_info_list( return dagrun_info_list -def _handle_clear_run(session, dag, dr, info, backfill_id, sort_ordinal): +def _handle_clear_run(session, dag, dr, info, backfill_id, sort_ordinal, run_on_latest=False): """Clear the existing DAG run and update backfill metadata.""" from sqlalchemy.sql import update @@ -415,6 +417,7 @@ def _handle_clear_run(session, dag, dr, info, backfill_id, sort_ordinal): session=session, confirm_prompt=False, dry_run=False, + run_on_latest_version=run_on_latest, ) # Update backfill_id and run_type in DagRun table @@ -447,6 +450,7 @@ def _create_backfill( dag_run_conf: dict | None, triggering_user_name: str | None, reprocess_behavior: ReprocessBehavior | None = None, + run_on_latest_version: bool = False, ) -> Backfill | None: from airflow.models import DagModel from airflow.models.serialized_dag import SerializedDagModel @@ -510,6 +514,7 @@ def _create_backfill( reprocess_behavior=br.reprocess_behavior, backfill_sort_ordinal=backfill_sort_ordinal, triggering_user_name=br.triggering_user_name, + run_on_latest_version=run_on_latest_version, session=session, ) log.info( diff --git a/airflow-core/src/airflow/models/dag.py b/airflow-core/src/airflow/models/dag.py index a141d96a1c30f..43fc62b098d73 100644 --- a/airflow-core/src/airflow/models/dag.py +++ b/airflow-core/src/airflow/models/dag.py @@ -1282,6 +1282,7 @@ def clear( only_running: bool = False, confirm_prompt: bool = False, dag_run_state: DagRunState = DagRunState.QUEUED, + run_on_latest_version: bool = False, session: Session = NEW_SESSION, dag_bag: DagBag | None = None, exclude_task_ids: frozenset[str] | frozenset[tuple[str, int]] | None = frozenset(), @@ -1299,6 +1300,7 @@ def clear( confirm_prompt: bool = False, dag_run_state: DagRunState = DagRunState.QUEUED, dry_run: Literal[False] = False, + run_on_latest_version: bool = False, session: Session = NEW_SESSION, dag_bag: DagBag | None = None, exclude_task_ids: frozenset[str] | frozenset[tuple[str, int]] | None = frozenset(), @@ -1317,6 +1319,7 @@ def clear( only_running: bool = False, confirm_prompt: bool = False, dag_run_state: DagRunState = DagRunState.QUEUED, + run_on_latest_version: bool = False, session: Session = NEW_SESSION, dag_bag: DagBag | None = None, exclude_task_ids: frozenset[str] | frozenset[tuple[str, int]] | None = frozenset(), @@ -1335,6 +1338,7 @@ def clear( confirm_prompt: bool = False, dag_run_state: DagRunState = DagRunState.QUEUED, dry_run: Literal[False] = False, + run_on_latest_version: bool = False, session: Session = NEW_SESSION, dag_bag: DagBag | None = None, exclude_task_ids: frozenset[str] | frozenset[tuple[str, int]] | None = frozenset(), @@ -1354,6 +1358,7 @@ def clear( confirm_prompt: bool = False, dag_run_state: DagRunState = DagRunState.QUEUED, dry_run: bool = False, + run_on_latest_version: bool = False, session: Session = NEW_SESSION, dag_bag: DagBag | None = None, exclude_task_ids: frozenset[str] | frozenset[tuple[str, int]] | None = frozenset(), @@ -1372,6 +1377,7 @@ def clear( :param dag_run_state: state to set DagRun to. If set to False, dagrun state will not be changed. :param dry_run: Find the tasks to clear but don't clear them. + :param run_on_latest_version: whether to run on latest serialized DAG and Bundle version :param session: The sqlalchemy session to use :param dag_bag: The DagBag used to find the dags (Optional) :param exclude_task_ids: A set of ``task_id`` or (``task_id``, ``map_index``) @@ -1417,6 +1423,7 @@ def clear( list(tis), session, dag_run_state=dag_run_state, + run_on_latest_version=run_on_latest_version, ) else: count = 0 diff --git a/airflow-core/src/airflow/models/taskinstance.py b/airflow-core/src/airflow/models/taskinstance.py index f4ed5573e5993..ca46b4802567a 100644 --- a/airflow-core/src/airflow/models/taskinstance.py +++ b/airflow-core/src/airflow/models/taskinstance.py @@ -203,6 +203,7 @@ def clear_task_instances( tis: list[TaskInstance], session: Session, dag_run_state: DagRunState | Literal[False] = DagRunState.QUEUED, + run_on_latest_version: bool = False, ) -> None: """ Clear a set of task instances, but make sure the running ones get killed. @@ -217,6 +218,7 @@ def clear_task_instances( :param session: current session :param dag_run_state: state to set finished DagRuns to. If set to False, DagRuns state will not be changed. + :param run_on_latest_version: whether to run on latest serialized DAG and Bundle version :meta private: """ @@ -234,7 +236,7 @@ def clear_task_instances( ti.state = TaskInstanceState.RESTARTING else: dr = ti.dag_run - ti_dag = scheduler_dagbag.get_dag(dag_run=dr, session=session) + ti_dag = scheduler_dagbag.get_dag(dag_run=dr, session=session, latest=run_on_latest_version) if not ti_dag: log.warning("No serialized dag found for dag '%s'", dr.dag_id) task_id = ti.task_id @@ -277,12 +279,12 @@ def clear_task_instances( if dr.state in State.finished_dr_states: dr.state = dag_run_state dr.start_date = timezone.utcnow() - dr_dag = scheduler_dagbag.get_dag(dag_run=dr, session=session) + dr_dag = scheduler_dagbag.get_dag(dag_run=dr, session=session, latest=run_on_latest_version) if not dr_dag: log.warning("No serialized dag found for dag '%s'", dr.dag_id) - if dr_dag and not dr_dag.disable_bundle_versioning: + if dr_dag and not dr_dag.disable_bundle_versioning and run_on_latest_version: bundle_version = dr.dag_model.bundle_version - if bundle_version is not None: + if bundle_version is not None and run_on_latest_version: dr.bundle_version = bundle_version if dag_run_state == DagRunState.QUEUED: dr.last_scheduling_decision = None diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts b/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts index a26e0d4960182..ab484d0801af0 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts @@ -1205,6 +1205,12 @@ export const $ClearTaskInstancesBody = { type: 'boolean', title: 'Include Past', default: false + }, + run_on_latest_version: { + type: 'boolean', + title: 'Run On Latest Version', + description: '(Experimental) Run on the latest bundle version of the DAG after clearing the task instances.', + default: false } }, additionalProperties: false, @@ -2205,6 +2211,12 @@ export const $DAGRunClearBody = { type: 'boolean', title: 'Only Failed', default: false + }, + run_on_latest_version: { + type: 'boolean', + title: 'Run On Latest Version', + description: '(Experimental) Run on the latest bundle version of the DAG after clearing the DAG Run.', + default: false } }, additionalProperties: false, diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts b/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts index 17e7de16780f7..be34c576ba1e1 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts @@ -389,6 +389,10 @@ export type ClearTaskInstancesBody = { include_downstream?: boolean; include_future?: boolean; include_past?: boolean; + /** + * (Experimental) Run on the latest bundle version of the DAG after clearing the task instances. + */ + run_on_latest_version?: boolean; }; /** @@ -594,6 +598,10 @@ export type DAGResponse = { export type DAGRunClearBody = { dry_run?: boolean; only_failed?: boolean; + /** + * (Experimental) Run on the latest bundle version of the DAG after clearing the DAG Run. + */ + run_on_latest_version?: boolean; }; /** diff --git a/airflow-core/src/airflow/ui/public/i18n/locales/en/dags.json b/airflow-core/src/airflow/ui/public/i18n/locales/en/dags.json index d1ee6a3d0eeb7..6705e10196206 100644 --- a/airflow-core/src/airflow/ui/public/i18n/locales/en/dags.json +++ b/airflow-core/src/airflow/ui/public/i18n/locales/en/dags.json @@ -62,6 +62,7 @@ "onlyFailed": "Clear only failed tasks", "past": "Past", "queueNew": "Queue up new tasks", + "runOnLatestVersion": "Run with latest bundle version", "upstream": "Upstream" } }, diff --git a/airflow-core/src/airflow/ui/src/components/Clear/Run/ClearRunDialog.tsx b/airflow-core/src/airflow/ui/src/components/Clear/Run/ClearRunDialog.tsx index 696ada0ee3815..3b3c9fe5f0eb3 100644 --- a/airflow-core/src/airflow/ui/src/components/Clear/Run/ClearRunDialog.tsx +++ b/airflow-core/src/airflow/ui/src/components/Clear/Run/ClearRunDialog.tsx @@ -21,9 +21,10 @@ import { useState } from "react"; import { useTranslation } from "react-i18next"; import { CgRedo } from "react-icons/cg"; +import { useDagServiceGetDagDetails } from "openapi/queries"; import type { DAGRunResponse } from "openapi/requests/types.gen"; import { ActionAccordion } from "src/components/ActionAccordion"; -import { Button, Dialog } from "src/components/ui"; +import { Button, Dialog, Checkbox } from "src/components/ui"; import SegmentedControl from "src/components/ui/SegmentedControl"; import { useClearDagRunDryRun } from "src/queries/useClearDagRunDryRun"; import { useClearDagRun } from "src/queries/useClearRun"; @@ -43,11 +44,17 @@ const ClearRunDialog = ({ dagRun, onClose, open }: Props) => { const [note, setNote] = useState(dagRun.note); const [selectedOptions, setSelectedOptions] = useState>(["existingTasks"]); const onlyFailed = selectedOptions.includes("onlyFailed"); + const [runOnLatestVersion, setRunOnLatestVersion] = useState(false); + + // Get current DAG's bundle version to compare with DAG run's bundle version + const { data: dagDetails } = useDagServiceGetDagDetails({ + dagId, + }); const { data: affectedTasks = { task_instances: [], total_entries: 0 } } = useClearDagRunDryRun({ dagId, dagRunId, - requestBody: { only_failed: onlyFailed }, + requestBody: { only_failed: onlyFailed, run_on_latest_version: runOnLatestVersion }, }); const { isPending, mutate } = useClearDagRun({ @@ -62,6 +69,13 @@ const ClearRunDialog = ({ dagRun, onClose, open }: Props) => { onSuccess: onClose, }); + // Check if bundle versions are different + const currentDagBundleVersion = dagDetails?.bundle_version; + const dagRunBundleVersion = dagRun.bundle_version; + const bundleVersionsDiffer = currentDagBundleVersion !== dagRunBundleVersion; + const shouldShowBundleVersionOption = + bundleVersionsDiffer && dagRunBundleVersion !== null && dagRunBundleVersion !== ""; + return ( @@ -101,7 +115,19 @@ const ClearRunDialog = ({ dagRun, onClose, open }: Props) => { /> - + + {shouldShowBundleVersionOption ? ( + setRunOnLatestVersion(Boolean(event.checked))} + > + {translate("dags:runAndTaskActions.options.runOnLatestVersion")} + + ) : undefined}