Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -224,3 +224,10 @@ def validate_new_state(cls, ns: str | None) -> str:
if ns not in valid_states:
raise ValueError(f"'{ns}' is not one of {valid_states}")
return ns


class BulkTaskInstanceBody(PatchTaskInstanceBody, StrictBaseModel):
"""Request body for bulk update, and delete task instances."""

task_id: str
map_index: int | None = None
Original file line number Diff line number Diff line change
Expand Up @@ -5568,6 +5568,58 @@ paths:
application/json:
schema:
$ref: '#/components/schemas/HTTPValidationError'
patch:
tags:
- Task Instance
summary: Bulk Task Instances
description: Bulk update, and delete task instances.
operationId: bulk_task_instances
security:
- OAuth2PasswordBearer: []
parameters:
- name: dag_id
in: path
required: true
schema:
type: string
title: Dag Id
- name: dag_run_id
in: path
required: true
schema:
type: string
title: Dag Run Id
requestBody:
required: true
content:
application/json:
schema:
$ref: '#/components/schemas/BulkBody_BulkTaskInstanceBody_'
responses:
'200':
description: Successful Response
content:
application/json:
schema:
$ref: '#/components/schemas/BulkResponse'
'401':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Unauthorized
'403':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Forbidden
'422':
description: Validation Error
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPValidationError'
/api/v2/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/list:
post:
tags:
Expand Down Expand Up @@ -7389,6 +7441,21 @@ components:

This structure helps users understand which key actions succeeded and which
failed.'
BulkBody_BulkTaskInstanceBody_:
properties:
actions:
items:
oneOf:
- $ref: '#/components/schemas/BulkCreateAction_BulkTaskInstanceBody_'
- $ref: '#/components/schemas/BulkUpdateAction_BulkTaskInstanceBody_'
- $ref: '#/components/schemas/BulkDeleteAction_BulkTaskInstanceBody_'
type: array
title: Actions
additionalProperties: false
type: object
required:
- actions
title: BulkBody[BulkTaskInstanceBody]
BulkBody_ConnectionBody_:
properties:
actions:
Expand Down Expand Up @@ -7434,6 +7501,28 @@ components:
required:
- actions
title: BulkBody[VariableBody]
BulkCreateAction_BulkTaskInstanceBody_:
properties:
action:
type: string
const: create
title: Action
description: The action to be performed on the entities.
entities:
items:
$ref: '#/components/schemas/BulkTaskInstanceBody'
type: array
title: Entities
description: A list of entities to be created.
action_on_existence:
$ref: '#/components/schemas/BulkActionOnExistence'
default: fail
additionalProperties: false
type: object
required:
- action
- entities
title: BulkCreateAction[BulkTaskInstanceBody]
BulkCreateAction_ConnectionBody_:
properties:
action:
Expand Down Expand Up @@ -7500,6 +7589,28 @@ components:
- action
- entities
title: BulkCreateAction[VariableBody]
BulkDeleteAction_BulkTaskInstanceBody_:
properties:
action:
type: string
const: delete
title: Action
description: The action to be performed on the entities.
entities:
items:
type: string
type: array
title: Entities
description: A list of entity id/key to be deleted.
action_on_non_existence:
$ref: '#/components/schemas/BulkActionNotOnExistence'
default: fail
additionalProperties: false
type: object
required:
- action
- entities
title: BulkDeleteAction[BulkTaskInstanceBody]
BulkDeleteAction_ConnectionBody_:
properties:
action:
Expand Down Expand Up @@ -7599,6 +7710,70 @@ components:

Fields are populated in the response only if the respective action was part
of the request, else are set None.'
BulkTaskInstanceBody:
properties:
new_state:
anyOf:
- $ref: '#/components/schemas/TaskInstanceState'
- type: 'null'
note:
anyOf:
- type: string
maxLength: 1000
- type: 'null'
title: Note
include_upstream:
type: boolean
title: Include Upstream
default: false
include_downstream:
type: boolean
title: Include Downstream
default: false
include_future:
type: boolean
title: Include Future
default: false
include_past:
type: boolean
title: Include Past
default: false
task_id:
type: string
title: Task Id
map_index:
anyOf:
- type: integer
- type: 'null'
title: Map Index
additionalProperties: false
type: object
required:
- task_id
title: BulkTaskInstanceBody
description: Request body for bulk update, and delete task instances.
BulkUpdateAction_BulkTaskInstanceBody_:
properties:
action:
type: string
const: update
title: Action
description: The action to be performed on the entities.
entities:
items:
$ref: '#/components/schemas/BulkTaskInstanceBody'
type: array
title: Entities
description: A list of entities to be updated.
action_on_non_existence:
$ref: '#/components/schemas/BulkActionNotOnExistence'
default: fail
additionalProperties: false
type: object
required:
- action
- entities
title: BulkUpdateAction[BulkTaskInstanceBody]
BulkUpdateAction_ConnectionBody_:
properties:
action:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@

import structlog
from fastapi import Depends, HTTPException, Query, status
from fastapi.exceptions import RequestValidationError
from pydantic import ValidationError
from sqlalchemy import or_, select
from sqlalchemy.orm import joinedload
from sqlalchemy.sql.selectable import Select
Expand Down Expand Up @@ -51,7 +49,9 @@
float_range_filter_factory,
)
from airflow.api_fastapi.common.router import AirflowRouter
from airflow.api_fastapi.core_api.datamodels.common import BulkBody, BulkResponse
from airflow.api_fastapi.core_api.datamodels.task_instances import (
BulkTaskInstanceBody,
ClearTaskInstancesBody,
PatchTaskInstanceBody,
TaskDependencyCollectionResponse,
Expand All @@ -63,11 +63,14 @@
)
from airflow.api_fastapi.core_api.openapi.exceptions import create_openapi_http_exception_doc
from airflow.api_fastapi.core_api.security import GetUserDep, ReadableTIFilterDep, requires_access_dag
from airflow.api_fastapi.core_api.services.public.task_instances import (
BulkTaskInstanceService,
_patch_ti_validate_request,
)
from airflow.api_fastapi.logging.decorators import action_logging
from airflow.exceptions import TaskNotFound
from airflow.listeners.listener import get_listener_manager
from airflow.models import Base, DagRun
from airflow.models.dag import DAG
from airflow.models.taskinstance import TaskInstance as TI, clear_task_instances
from airflow.models.taskinstancehistory import TaskInstanceHistory as TIH
from airflow.ti_deps.dep_context import DepContext
Expand Down Expand Up @@ -728,54 +731,6 @@ def post_clear_task_instances(
)


def _patch_ti_validate_request(
dag_id: str,
dag_run_id: str,
task_id: str,
dag_bag: DagBagDep,
body: PatchTaskInstanceBody,
session: SessionDep,
map_index: int | None = -1,
update_mask: list[str] | None = Query(None),
) -> tuple[DAG, list[TI], dict]:
dag = dag_bag.get_dag(dag_id)
if not dag:
raise HTTPException(status.HTTP_404_NOT_FOUND, f"DAG {dag_id} not found")

if not dag.has_task(task_id):
raise HTTPException(status.HTTP_404_NOT_FOUND, f"Task '{task_id}' not found in DAG '{dag_id}'")

query = (
select(TI)
.where(TI.dag_id == dag_id, TI.run_id == dag_run_id, TI.task_id == task_id)
.join(TI.dag_run)
.options(joinedload(TI.rendered_task_instance_fields))
)
if map_index is not None:
query = query.where(TI.map_index == map_index)
else:
query = query.order_by(TI.map_index)

tis = session.scalars(query).all()

err_msg_404 = (
f"The Task Instance with dag_id: `{dag_id}`, run_id: `{dag_run_id}`, task_id: `{task_id}` and map_index: `{map_index}` was not found",
)
if len(tis) == 0:
raise HTTPException(status.HTTP_404_NOT_FOUND, err_msg_404)

fields_to_update = body.model_fields_set
if update_mask:
fields_to_update = fields_to_update.intersection(update_mask)
else:
try:
PatchTaskInstanceBody.model_validate(body)
except ValidationError as e:
raise RequestValidationError(errors=e.errors())

return dag, list(tis), body.model_dump(include=fields_to_update, by_alias=True)


@task_instances_router.patch(
task_instances_prefix + "/{task_id}/dry_run",
responses=create_openapi_http_exception_doc(
Expand Down Expand Up @@ -835,6 +790,24 @@ def patch_task_instance_dry_run(
)


@task_instances_router.patch(
task_instances_prefix,
dependencies=[Depends(requires_access_dag(method="PUT", access_entity=DagAccessEntity.TASK_INSTANCE))],
)
def bulk_task_instances(
request: BulkBody[BulkTaskInstanceBody],
session: SessionDep,
dag_id: str,
dag_bag: DagBagDep,
dag_run_id: str,
user: GetUserDep,
) -> BulkResponse:
"""Bulk update, and delete task instances."""
return BulkTaskInstanceService(
session=session, request=request, dag_id=dag_id, dag_run_id=dag_run_id, dag_bag=dag_bag, user=user
).handle_request()


@task_instances_router.patch(
task_instances_prefix + "/{task_id}",
responses=create_openapi_http_exception_doc(
Expand Down
Loading
Loading