diff --git a/airflow/api_fastapi/core_api/datamodels/backfills.py b/airflow/api_fastapi/core_api/datamodels/backfills.py index be04063907a9d..4926f28693e8e 100644 --- a/airflow/api_fastapi/core_api/datamodels/backfills.py +++ b/airflow/api_fastapi/core_api/datamodels/backfills.py @@ -56,3 +56,16 @@ class BackfillCollectionResponse(BaseModel): backfills: list[BackfillResponse] total_entries: int + + +class DryRunBackfillResponse(BaseModel): + """Data model for run information during a backfill operation.""" + + logical_date: datetime + + +class DryRunBackfillCollectionResponse(BaseModel): + """Serializer for responses in dry-run mode for backfill operations.""" + + backfills: list[DryRunBackfillResponse] + total_entries: int diff --git a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml index a1177ec24096e..9a189e5f9a1e3 100644 --- a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml +++ b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml @@ -1481,6 +1481,55 @@ paths: application/json: schema: $ref: '#/components/schemas/HTTPValidationError' + /public/backfills/dry_run: + post: + tags: + - Backfill + summary: Create Backfill Dry Run + operationId: create_backfill_dry_run + requestBody: + content: + application/json: + schema: + $ref: '#/components/schemas/BackfillPostBody' + required: true + responses: + '200': + description: Successful Response + content: + application/json: + schema: + $ref: '#/components/schemas/DryRunBackfillCollectionResponse' + '401': + description: Unauthorized + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPExceptionResponse' + '403': + description: Forbidden + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPExceptionResponse' + '404': + description: Not Found + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPExceptionResponse' + '409': + description: Conflict + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPExceptionResponse' + '422': + description: Validation Error + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPValidationError' /public/connections/{connection_id}: delete: tags: @@ -7912,6 +7961,33 @@ components: This is the set of allowable values for the ``warning_type`` field in the DagWarning model.' + DryRunBackfillCollectionResponse: + properties: + backfills: + items: + $ref: '#/components/schemas/DryRunBackfillResponse' + type: array + title: Backfills + total_entries: + type: integer + title: Total Entries + type: object + required: + - backfills + - total_entries + title: DryRunBackfillCollectionResponse + description: Serializer for responses in dry-run mode for backfill operations. + DryRunBackfillResponse: + properties: + logical_date: + type: string + format: date-time + title: Logical Date + type: object + required: + - logical_date + title: DryRunBackfillResponse + description: Data model for run information during a backfill operation. EdgeResponse: properties: is_setup_teardown: diff --git a/airflow/api_fastapi/core_api/routes/public/backfills.py b/airflow/api_fastapi/core_api/routes/public/backfills.py index 61d25597cdd15..931ad167e9e10 100644 --- a/airflow/api_fastapi/core_api/routes/public/backfills.py +++ b/airflow/api_fastapi/core_api/routes/public/backfills.py @@ -32,6 +32,8 @@ BackfillCollectionResponse, BackfillPostBody, BackfillResponse, + DryRunBackfillCollectionResponse, + DryRunBackfillResponse, ) from airflow.api_fastapi.core_api.openapi.exceptions import ( create_openapi_http_exception_doc, @@ -206,3 +208,40 @@ def create_backfill( status_code=status.HTTP_409_CONFLICT, detail=f"There is already a running backfill for dag {backfill_request.dag_id}", ) + + +@backfills_router.post( + path="/dry_run", + responses=create_openapi_http_exception_doc( + [ + status.HTTP_404_NOT_FOUND, + status.HTTP_409_CONFLICT, + ] + ), +) +def create_backfill_dry_run( + body: BackfillPostBody, +) -> DryRunBackfillCollectionResponse: + from_date = timezone.coerce_datetime(body.from_date) + to_date = timezone.coerce_datetime(body.to_date) + + try: + backfills_dry_run = _create_backfill( + dag_id=body.dag_id, + from_date=from_date, + to_date=to_date, + max_active_runs=body.max_active_runs, + reverse=body.run_backwards, + dag_run_conf=body.dag_run_conf, + reprocess_behavior=body.reprocess_behavior, + dry_run=True, + ) + backfills = [DryRunBackfillResponse(logical_date=logical_date) for logical_date in backfills_dry_run] + + return DryRunBackfillCollectionResponse(backfills=backfills, total_entries=len(backfills_dry_run)) + + except AlreadyRunningBackfill: + raise HTTPException( + status_code=status.HTTP_409_CONFLICT, + detail="There is already a running backfill for the dag", + ) diff --git a/airflow/models/backfill.py b/airflow/models/backfill.py index 0e88fa15bb04f..c9158bbad240d 100644 --- a/airflow/models/backfill.py +++ b/airflow/models/backfill.py @@ -25,18 +25,9 @@ import logging from enum import Enum -from typing import TYPE_CHECKING - -from sqlalchemy import ( - Boolean, - Column, - ForeignKeyConstraint, - Integer, - UniqueConstraint, - desc, - func, - select, -) +from typing import TYPE_CHECKING, overload + +from sqlalchemy import Boolean, Column, ForeignKeyConstraint, Integer, UniqueConstraint, case, func, select from sqlalchemy.orm import relationship, validates from sqlalchemy_jsonfield import JSONField @@ -47,13 +38,15 @@ from airflow.settings import json from airflow.utils import timezone from airflow.utils.session import create_session -from airflow.utils.sqlalchemy import UtcDateTime, nulls_first, with_row_locks +from airflow.utils.sqlalchemy import UtcDateTime, with_row_locks from airflow.utils.state import DagRunState from airflow.utils.types import DagRunTriggeredByType, DagRunType if TYPE_CHECKING: from datetime import datetime + from typing_extensions import Literal + log = logging.getLogger(__name__) @@ -158,72 +151,125 @@ def validate_sort_ordinal(self, key, val): def _create_backfill_dag_run( *, dag, - info, - reprocess_behavior: ReprocessBehavior, + dagrun_info_list, + reprocess_behavior: ReprocessBehavior | None = None, backfill_id, - dag_run_conf, - backfill_sort_ordinal, + dag_run_conf: dict | None, session, -): + dry_run, +) -> list[datetime]: from airflow.models import DagRun - with session.begin_nested() as nested: - dr = session.scalar( - with_row_locks( - select(DagRun) - .where(DagRun.logical_date == info.logical_date) - .order_by(nulls_first(desc(DagRun.start_date), session=session)) - .limit(1), - session=session, + backfill_sort_ordinal = 0 + logical_dates = [] + dagrun_infos = list(dagrun_info_list) + + if reprocess_behavior is None: + reprocess_behavior = ReprocessBehavior.NONE + if dag_run_conf is None: + dag_run_conf = {} + + dag_run_ranked = ( + select( + DagRun.logical_date, + DagRun.start_date, + DagRun.dag_id, + func.row_number() + .over( + partition_by=DagRun.logical_date, + order_by=(case([(DagRun.start_date.is_(None), 0)], else_=1), DagRun.start_date.desc()), ) + .label("row_number"), ) - if dr: - non_create_reason = None - if dr.state not in (DagRunState.SUCCESS, DagRunState.FAILED): - non_create_reason = BackfillDagRunExceptionReason.IN_FLIGHT - elif reprocess_behavior is ReprocessBehavior.NONE: - non_create_reason = BackfillDagRunExceptionReason.ALREADY_EXISTS - elif reprocess_behavior is ReprocessBehavior.FAILED: - if dr.state != DagRunState.FAILED: + .where(DagRun.dag_id == dag.dag_id) + .where(DagRun.logical_date.in_([info.logical_date for info in dagrun_infos])) + .subquery() + ) + + existing_dag_runs = { + dr.logical_date: dr + for dr in session.scalars( + select(DagRun) + .join( + dag_run_ranked, + (DagRun.logical_date == dag_run_ranked.c.logical_date) + & ( + (DagRun.start_date == dag_run_ranked.c.start_date) + | ((DagRun.start_date.is_(None)) & (dag_run_ranked.c.start_date.is_(None))) + ) + & (DagRun.dag_id == dag_run_ranked.c.dag_id), + ) + .where(dag_run_ranked.c.row_number == 1) + ).all() + } + + print(existing_dag_runs) + + for info in dagrun_infos: + backfill_sort_ordinal += 1 + dr = existing_dag_runs.get(info.logical_date) + non_create_reason = None + + with session.begin_nested() as nested: + if dr: + if dr.state not in (DagRunState.SUCCESS, DagRunState.FAILED): + non_create_reason = BackfillDagRunExceptionReason.IN_FLIGHT + elif reprocess_behavior is ReprocessBehavior.NONE: non_create_reason = BackfillDagRunExceptionReason.ALREADY_EXISTS - if non_create_reason: - # rolling back here restores to start of this nested tran - # which releases the lock on the latest dag run, since we - # are not creating a new one - nested.rollback() + elif reprocess_behavior is ReprocessBehavior.FAILED: + if dr.state != DagRunState.FAILED: + non_create_reason = BackfillDagRunExceptionReason.ALREADY_EXISTS + + if non_create_reason: + if not dry_run: + nested.rollback() + session.add( + BackfillDagRun( + backfill_id=backfill_id, + dag_run_id=None, + logical_date=dr.logical_date, + exception_reason=non_create_reason, + sort_ordinal=backfill_sort_ordinal, + ) + ) + else: + logical_dates.append(dr.logical_date) + else: + logical_dates.append(info.logical_date) + + if not non_create_reason and not dry_run: + dag_version = DagVersion.get_latest_version(dag.dag_id, session=session) + dr = dag.create_dagrun( + triggered_by=DagRunTriggeredByType.BACKFILL, + logical_date=info.logical_date, + data_interval=info.data_interval, + start_date=timezone.utcnow(), + state=DagRunState.QUEUED, + external_trigger=False, + conf=dag_run_conf, + run_type=DagRunType.BACKFILL_JOB, + creating_job_id=None, + session=session, + backfill_id=backfill_id, + dag_version=dag_version, + ) session.add( BackfillDagRun( backfill_id=backfill_id, - dag_run_id=None, - logical_date=info.logical_date, - exception_reason=non_create_reason, + dag_run_id=dr.id, sort_ordinal=backfill_sort_ordinal, + logical_date=info.logical_date, ) ) - return - dag_version = DagVersion.get_latest_version(dag.dag_id, session=session) - dr = dag.create_dagrun( - triggered_by=DagRunTriggeredByType.BACKFILL, - logical_date=info.logical_date, - data_interval=info.data_interval, - start_date=timezone.utcnow(), - state=DagRunState.QUEUED, - external_trigger=False, - conf=dag_run_conf, - run_type=DagRunType.BACKFILL_JOB, - creating_job_id=None, - session=session, - backfill_id=backfill_id, - dag_version=dag_version, - ) - session.add( - BackfillDagRun( - backfill_id=backfill_id, - dag_run_id=dr.id, - sort_ordinal=backfill_sort_ordinal, - logical_date=info.logical_date, - ) - ) + + log.info( + "created backfill dag run dag_id=%s backfill_id=%s, info=%s", + dag.dag_id, + backfill_id, + info, + ) + + return logical_dates def _get_info_list( @@ -241,6 +287,34 @@ def _get_info_list( return dagrun_info_list +@overload +def _create_backfill( + *, + dag_id: str, + from_date: datetime, + to_date: datetime, + max_active_runs: int, + reverse: bool, + dag_run_conf: dict | None, + reprocess_behavior: ReprocessBehavior | None = ..., + dry_run: Literal[True], +) -> list[datetime]: ... + + +@overload +def _create_backfill( + *, + dag_id: str, + from_date: datetime, + to_date: datetime, + max_active_runs: int, + reverse: bool, + dag_run_conf: dict | None, + reprocess_behavior: ReprocessBehavior | None = ..., + dry_run: Literal[False] | None = ..., +) -> Backfill | None: ... + + def _create_backfill( *, dag_id: str, @@ -250,7 +324,8 @@ def _create_backfill( reverse: bool, dag_run_conf: dict | None, reprocess_behavior: ReprocessBehavior | None = None, -) -> Backfill | None: + dry_run: bool | None = False, +) -> Backfill | list[datetime] | None: from airflow.models import DagModel from airflow.models.serialized_dag import SerializedDagModel @@ -284,18 +359,23 @@ def _create_backfill( "You must set reprocess behavior to reprocess completed or " "reprocess failed" ) - br = Backfill( - dag_id=dag_id, - from_date=from_date, - to_date=to_date, - max_active_runs=max_active_runs, - dag_run_conf=dag_run_conf, - reprocess_behavior=reprocess_behavior, - ) - session.add(br) - session.commit() - backfill_sort_ordinal = 0 + backfill_id = None + + if not dry_run: + br = Backfill( + dag_id=dag_id, + from_date=from_date, + to_date=to_date, + max_active_runs=max_active_runs, + dag_run_conf=dag_run_conf, + reprocess_behavior=reprocess_behavior, + ) + + session.add(br) + session.commit() + + backfill_id = br.id dagrun_info_list = _get_info_list( from_date=from_date, @@ -316,21 +396,15 @@ def _create_backfill( ) if not dag_model: raise RuntimeError(f"Dag {dag_id} not found") - for info in dagrun_info_list: - backfill_sort_ordinal += 1 - _create_backfill_dag_run( - dag=dag, - info=info, - backfill_id=br.id, - dag_run_conf=br.dag_run_conf, - reprocess_behavior=br.reprocess_behavior, - backfill_sort_ordinal=backfill_sort_ordinal, - session=session, - ) - log.info( - "created backfill dag run dag_id=%s backfill_id=%s, info=%s", - dag.dag_id, - br.id, - info, - ) - return br + + backfill_response = _create_backfill_dag_run( + dag=dag, + dagrun_info_list=dagrun_info_list, + backfill_id=backfill_id, + dag_run_conf=dag_run_conf, + reprocess_behavior=reprocess_behavior, + session=session, + dry_run=dry_run, + ) + + return br if not dry_run else backfill_response diff --git a/airflow/ui/openapi-gen/queries/common.ts b/airflow/ui/openapi-gen/queries/common.ts index eef4fca5b8331..d7f5bbd46d4a4 100644 --- a/airflow/ui/openapi-gen/queries/common.ts +++ b/airflow/ui/openapi-gen/queries/common.ts @@ -1740,6 +1740,9 @@ export type AssetServiceCreateAssetEventMutationResult = Awaited< export type BackfillServiceCreateBackfillMutationResult = Awaited< ReturnType >; +export type BackfillServiceCreateBackfillDryRunMutationResult = Awaited< + ReturnType +>; export type ConnectionServicePostConnectionMutationResult = Awaited< ReturnType >; diff --git a/airflow/ui/openapi-gen/queries/queries.ts b/airflow/ui/openapi-gen/queries/queries.ts index ddec28ddd618e..1cd74e5c2ca00 100644 --- a/airflow/ui/openapi-gen/queries/queries.ts +++ b/airflow/ui/openapi-gen/queries/queries.ts @@ -2899,6 +2899,44 @@ export const useBackfillServiceCreateBackfill = < }) as unknown as Promise, ...options, }); +/** + * Create Backfill Dry Run + * @param data The data for the request. + * @param data.requestBody + * @returns DryRunBackfillCollectionResponse Successful Response + * @throws ApiError + */ +export const useBackfillServiceCreateBackfillDryRun = < + TData = Common.BackfillServiceCreateBackfillDryRunMutationResult, + TError = unknown, + TContext = unknown, +>( + options?: Omit< + UseMutationOptions< + TData, + TError, + { + requestBody: BackfillPostBody; + }, + TContext + >, + "mutationFn" + >, +) => + useMutation< + TData, + TError, + { + requestBody: BackfillPostBody; + }, + TContext + >({ + mutationFn: ({ requestBody }) => + BackfillService.createBackfillDryRun({ + requestBody, + }) as unknown as Promise, + ...options, + }); /** * Post Connection * Create connection entry. diff --git a/airflow/ui/openapi-gen/requests/schemas.gen.ts b/airflow/ui/openapi-gen/requests/schemas.gen.ts index 6d87d20de1016..2d9cade6e3085 100644 --- a/airflow/ui/openapi-gen/requests/schemas.gen.ts +++ b/airflow/ui/openapi-gen/requests/schemas.gen.ts @@ -2643,6 +2643,41 @@ This is the set of allowable values for the \`\`warning_type\`\` field in the DagWarning model.`, } as const; +export const $DryRunBackfillCollectionResponse = { + properties: { + backfills: { + items: { + $ref: "#/components/schemas/DryRunBackfillResponse", + }, + type: "array", + title: "Backfills", + }, + total_entries: { + type: "integer", + title: "Total Entries", + }, + }, + type: "object", + required: ["backfills", "total_entries"], + title: "DryRunBackfillCollectionResponse", + description: + "Serializer for responses in dry-run mode for backfill operations.", +} as const; + +export const $DryRunBackfillResponse = { + properties: { + logical_date: { + type: "string", + format: "date-time", + title: "Logical Date", + }, + }, + type: "object", + required: ["logical_date"], + title: "DryRunBackfillResponse", + description: "Data model for run information during a backfill operation.", +} as const; + export const $EdgeResponse = { properties: { is_setup_teardown: { diff --git a/airflow/ui/openapi-gen/requests/services.gen.ts b/airflow/ui/openapi-gen/requests/services.gen.ts index 9100a65d0d185..475de297a3a47 100644 --- a/airflow/ui/openapi-gen/requests/services.gen.ts +++ b/airflow/ui/openapi-gen/requests/services.gen.ts @@ -54,6 +54,8 @@ import type { UnpauseBackfillResponse, CancelBackfillData, CancelBackfillResponse, + CreateBackfillDryRunData, + CreateBackfillDryRunResponse, GridDataData, GridDataResponse, DeleteConnectionData, @@ -964,6 +966,31 @@ export class BackfillService { }, }); } + + /** + * Create Backfill Dry Run + * @param data The data for the request. + * @param data.requestBody + * @returns DryRunBackfillCollectionResponse Successful Response + * @throws ApiError + */ + public static createBackfillDryRun( + data: CreateBackfillDryRunData, + ): CancelablePromise { + return __request(OpenAPI, { + method: "POST", + url: "/public/backfills/dry_run", + body: data.requestBody, + mediaType: "application/json", + errors: { + 401: "Unauthorized", + 403: "Forbidden", + 404: "Not Found", + 409: "Conflict", + 422: "Validation Error", + }, + }); + } } export class GridService { diff --git a/airflow/ui/openapi-gen/requests/types.gen.ts b/airflow/ui/openapi-gen/requests/types.gen.ts index 36ecc53863f19..4f8e5a0900443 100644 --- a/airflow/ui/openapi-gen/requests/types.gen.ts +++ b/airflow/ui/openapi-gen/requests/types.gen.ts @@ -640,6 +640,21 @@ export type DagTagResponse = { */ export type DagWarningType = "asset conflict" | "non-existent pool"; +/** + * Serializer for responses in dry-run mode for backfill operations. + */ +export type DryRunBackfillCollectionResponse = { + backfills: Array; + total_entries: number; +}; + +/** + * Data model for run information during a backfill operation. + */ +export type DryRunBackfillResponse = { + logical_date: string; +}; + /** * Edge serializer for responses. */ @@ -1579,6 +1594,12 @@ export type CancelBackfillData = { export type CancelBackfillResponse = BackfillResponse; +export type CreateBackfillDryRunData = { + requestBody: BackfillPostBody; +}; + +export type CreateBackfillDryRunResponse = DryRunBackfillCollectionResponse; + export type GridDataData = { dagId: string; includeDownstream?: boolean; @@ -2852,6 +2873,37 @@ export type $OpenApiTs = { }; }; }; + "/public/backfills/dry_run": { + post: { + req: CreateBackfillDryRunData; + res: { + /** + * Successful Response + */ + 200: DryRunBackfillCollectionResponse; + /** + * Unauthorized + */ + 401: HTTPExceptionResponse; + /** + * Forbidden + */ + 403: HTTPExceptionResponse; + /** + * Not Found + */ + 404: HTTPExceptionResponse; + /** + * Conflict + */ + 409: HTTPExceptionResponse; + /** + * Validation Error + */ + 422: HTTPValidationError; + }; + }; + }; "/ui/grid/{dag_id}": { get: { req: GridDataData; diff --git a/tests/api_fastapi/core_api/routes/public/test_backfills.py b/tests/api_fastapi/core_api/routes/public/test_backfills.py index 1c64b10848f4b..4f0ae7918e41d 100644 --- a/tests/api_fastapi/core_api/routes/public/test_backfills.py +++ b/tests/api_fastapi/core_api/routes/public/test_backfills.py @@ -192,6 +192,7 @@ def test_create_backfill(self, repro_act, repro_exp, session, dag_maker, test_cl "max_active_runs": max_active_runs, "run_backwards": False, "dag_run_conf": {"param1": "val1", "param2": True}, + "dry_run": False, } if repro_act is not None: data["reprocess_behavior"] = repro_act @@ -215,6 +216,93 @@ def test_create_backfill(self, repro_act, repro_exp, session, dag_maker, test_cl } +class TestCreateBackfillDryRun(TestBackfillEndpoint): + @pytest.mark.parametrize( + "reprocess_behavior, expected_dates", + [ + ( + "none", + [ + {"logical_date": "2024-01-01T00:00:00Z"}, + {"logical_date": "2024-01-04T00:00:00Z"}, + {"logical_date": "2024-01-05T00:00:00Z"}, + ], + ), + ( + "failed", + [ + {"logical_date": "2024-01-01T00:00:00Z"}, + {"logical_date": "2024-01-03T00:00:00Z"}, # Reprocess failed + {"logical_date": "2024-01-04T00:00:00Z"}, + {"logical_date": "2024-01-05T00:00:00Z"}, + ], + ), + ( + "completed", + [ + {"logical_date": "2024-01-01T00:00:00Z"}, + {"logical_date": "2024-01-02T00:00:00Z"}, # Reprocess all + {"logical_date": "2024-01-03T00:00:00Z"}, + {"logical_date": "2024-01-04T00:00:00Z"}, + {"logical_date": "2024-01-05T00:00:00Z"}, + ], + ), + ], + ) + def test_create_backfill_dry_run( + self, session, dag_maker, test_client, reprocess_behavior, expected_dates + ): + with dag_maker( + session=session, + dag_id="TEST_DAG_2", + schedule="0 0 * * *", + start_date=pendulum.parse("2024-01-01"), + ) as dag: + EmptyOperator(task_id="mytask") + + session.commit() + + existing_dagruns = [ + {"logical_date": pendulum.parse("2024-01-02"), "state": DagRunState.SUCCESS}, # Completed dag run + {"logical_date": pendulum.parse("2024-01-03"), "state": DagRunState.FAILED}, # Failed dag run + ] + for dagrun in existing_dagruns: + session.add( + DagRun( + dag_id=dag.dag_id, + run_id=f"manual__{dagrun['logical_date'].isoformat()}", + logical_date=dagrun["logical_date"], + state=dagrun["state"], + run_type="scheduled", + ) + ) + session.commit() + + from_date = pendulum.parse("2024-01-01") + from_date_iso = to_iso(from_date) + to_date = pendulum.parse("2024-01-05") + to_date_iso = to_iso(to_date) + + data = { + "dag_id": dag.dag_id, + "from_date": from_date_iso, + "to_date": to_date_iso, + "max_active_runs": 5, + "run_backwards": False, + "dag_run_conf": {"param1": "val1", "param2": True}, + "reprocess_behavior": reprocess_behavior, + } + + response = test_client.post( + url="/public/backfills/dry_run", + json=data, + ) + + assert response.status_code == 200 + response_json = response.json() + assert response_json["backfills"] == expected_dates + + class TestCancelBackfill(TestBackfillEndpoint): def test_cancel_backfill(self, session, test_client): (dag,) = self._create_dag_models()