Skip to content

Commit

Permalink
AIP-84 Add Lists Jobs with Filters API (#43859)
Browse files Browse the repository at this point in the history
* AIP-81 Add Lists Jobs with filters API

* Refactor job filter params

* Refactor test with zulu format datetime utils

* Remove 401, 403 status code

* Fix TestGetEventLog logical_date datetime format

* Fix the trailing endpoint naming

* Fix paginated_select mypy fail
  • Loading branch information
jason810496 authored Nov 25, 2024
1 parent 05f935d commit acf106b
Show file tree
Hide file tree
Showing 16 changed files with 985 additions and 3 deletions.
54 changes: 54 additions & 0 deletions airflow/api_fastapi/common/parameters.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
from sqlalchemy.inspection import inspect

from airflow.api_connexion.endpoints.task_instance_endpoint import _convert_ti_states
from airflow.jobs.job import Job
from airflow.models import Base, Connection
from airflow.models.asset import AssetEvent, AssetModel, DagScheduleAssetReference, TaskOutletAssetReference
from airflow.models.dag import DagModel, DagTag
Expand Down Expand Up @@ -449,6 +450,54 @@ def depends(self, tag_name_pattern: str | None = None) -> _DagTagNamePatternSear
return self.set_value(tag_name_pattern)


class _JobTypeFilter(BaseParam[str]):
"""Filter on job_type."""

def to_orm(self, select: Select) -> Select:
if self.value is None and self.skip_none:
return select
return select.where(Job.job_type == self.value)

def depends(self, job_type: str | None = None) -> _JobTypeFilter:
return self.set_value(job_type)


class _JobStateFilter(BaseParam[str]):
"""Filter on job_state."""

def to_orm(self, select: Select) -> Select:
if self.value is None and self.skip_none:
return select
return select.where(Job.state == self.value)

def depends(self, job_state: str | None = None) -> _JobStateFilter:
return self.set_value(job_state)


class _JobHostnameFilter(BaseParam[str]):
"""Filter on hostname."""

def to_orm(self, select: Select) -> Select:
if self.value is None and self.skip_none:
return select
return select.where(Job.hostname == self.value)

def depends(self, hostname: str | None = None) -> _JobHostnameFilter:
return self.set_value(hostname)


class _JobExecutorClassFilter(BaseParam[str]):
"""Filter on executor_class."""

def to_orm(self, select: Select) -> Select:
if self.value is None and self.skip_none:
return select
return select.where(Job.executor_class == self.value)

def depends(self, executor_class: str | None = None) -> _JobExecutorClassFilter:
return self.set_value(executor_class)


def _safe_parse_datetime(date_to_check: str) -> datetime:
"""
Parse datetime and raise error for invalid dates.
Expand Down Expand Up @@ -718,6 +767,11 @@ def depends_float(
QueryTIPoolFilter = Annotated[TIPoolFilter, Depends(TIPoolFilter().depends)]
QueryTIQueueFilter = Annotated[TIQueueFilter, Depends(TIQueueFilter().depends)]
QueryTIExecutorFilter = Annotated[TIExecutorFilter, Depends(TIExecutorFilter().depends)]
# Job
QueryJobTypeFilter = Annotated[_JobTypeFilter, Depends(_JobTypeFilter().depends)]
QueryJobStateFilter = Annotated[_JobStateFilter, Depends(_JobStateFilter().depends)]
QueryJobHostnameFilter = Annotated[_JobHostnameFilter, Depends(_JobHostnameFilter().depends)]
QueryJobExecutorClassFilter = Annotated[_JobExecutorClassFilter, Depends(_JobExecutorClassFilter().depends)]

# Assets
QueryUriPatternSearch = Annotated[_UriPatternSearch, Depends(_UriPatternSearch().depends)]
Expand Down
7 changes: 7 additions & 0 deletions airflow/api_fastapi/core_api/datamodels/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,3 +34,10 @@ class JobResponse(BaseModel):
executor_class: datetime | None
hostname: str | None
unixname: str | None


class JobCollectionResponse(BaseModel):
"""Job Collection Response."""

jobs: list[JobResponse]
total_entries: int
154 changes: 154 additions & 0 deletions airflow/api_fastapi/core_api/openapi/v1-generated.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3000,6 +3000,144 @@ paths:
application/json:
schema:
$ref: '#/components/schemas/HTTPValidationError'
/public/jobs:
get:
tags:
- Job
summary: Get Jobs
description: Get all jobs.
operationId: get_jobs
parameters:
- name: is_alive
in: query
required: false
schema:
anyOf:
- type: boolean
- type: 'null'
title: Is Alive
- name: start_date_gte
in: query
required: false
schema:
anyOf:
- type: string
format: date-time
- type: 'null'
title: Start Date Gte
- name: start_date_lte
in: query
required: false
schema:
anyOf:
- type: string
format: date-time
- type: 'null'
title: Start Date Lte
- name: end_date_gte
in: query
required: false
schema:
anyOf:
- type: string
format: date-time
- type: 'null'
title: End Date Gte
- name: end_date_lte
in: query
required: false
schema:
anyOf:
- type: string
format: date-time
- type: 'null'
title: End Date Lte
- name: limit
in: query
required: false
schema:
type: integer
minimum: 0
default: 100
title: Limit
- name: offset
in: query
required: false
schema:
type: integer
minimum: 0
default: 0
title: Offset
- name: order_by
in: query
required: false
schema:
type: string
default: id
title: Order By
- name: job_state
in: query
required: false
schema:
anyOf:
- type: string
- type: 'null'
title: Job State
- name: job_type
in: query
required: false
schema:
anyOf:
- type: string
- type: 'null'
title: Job Type
- name: hostname
in: query
required: false
schema:
anyOf:
- type: string
- type: 'null'
title: Hostname
- name: executor_class
in: query
required: false
schema:
anyOf:
- type: string
- type: 'null'
title: Executor Class
responses:
'200':
description: Successful Response
content:
application/json:
schema:
$ref: '#/components/schemas/JobCollectionResponse'
'401':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Unauthorized
'403':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Forbidden
'400':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Bad Request
'422':
description: Validation Error
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPValidationError'
/public/plugins:
get:
tags:
Expand Down Expand Up @@ -6708,6 +6846,22 @@ components:
- stack_trace
title: ImportErrorResponse
description: Import Error Response.
JobCollectionResponse:
properties:
jobs:
items:
$ref: '#/components/schemas/JobResponse'
type: array
title: Jobs
total_entries:
type: integer
title: Total Entries
type: object
required:
- jobs
- total_entries
title: JobCollectionResponse
description: Job Collection Response.
JobResponse:
properties:
id:
Expand Down
2 changes: 2 additions & 0 deletions airflow/api_fastapi/core_api/routes/public/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
from airflow.api_fastapi.core_api.routes.public.dags import dags_router
from airflow.api_fastapi.core_api.routes.public.event_logs import event_logs_router
from airflow.api_fastapi.core_api.routes.public.import_error import import_error_router
from airflow.api_fastapi.core_api.routes.public.job import job_router
from airflow.api_fastapi.core_api.routes.public.log import task_instances_log_router
from airflow.api_fastapi.core_api.routes.public.monitor import monitor_router
from airflow.api_fastapi.core_api.routes.public.plugins import plugins_router
Expand Down Expand Up @@ -61,6 +62,7 @@
authenticated_router.include_router(dags_router)
authenticated_router.include_router(event_logs_router)
authenticated_router.include_router(import_error_router)
authenticated_router.include_router(job_router)
authenticated_router.include_router(plugins_router)
authenticated_router.include_router(pools_router)
authenticated_router.include_router(providers_router)
Expand Down
127 changes: 127 additions & 0 deletions airflow/api_fastapi/core_api/routes/public/job.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations

from fastapi import Depends, status
from sqlalchemy import select
from sqlalchemy.orm import Session
from typing_extensions import Annotated

from airflow.api_fastapi.common.db.common import (
get_session,
paginated_select,
)
from airflow.api_fastapi.common.parameters import (
QueryJobExecutorClassFilter,
QueryJobHostnameFilter,
QueryJobStateFilter,
QueryJobTypeFilter,
QueryLimit,
QueryOffset,
RangeFilter,
SortParam,
datetime_range_filter_factory,
)
from airflow.api_fastapi.common.router import AirflowRouter
from airflow.api_fastapi.core_api.datamodels.job import (
JobCollectionResponse,
JobResponse,
)
from airflow.api_fastapi.core_api.openapi.exceptions import create_openapi_http_exception_doc
from airflow.jobs.job import Job
from airflow.utils.state import JobState

job_router = AirflowRouter(tags=["Job"], prefix="/jobs")


@job_router.get(
"",
responses=create_openapi_http_exception_doc([status.HTTP_400_BAD_REQUEST]),
)
def get_jobs(
start_date_range: Annotated[
RangeFilter,
Depends(datetime_range_filter_factory("start_date", Job)),
],
end_date_range: Annotated[
RangeFilter,
Depends(datetime_range_filter_factory("end_date", Job)),
],
limit: QueryLimit,
offset: QueryOffset,
order_by: Annotated[
SortParam,
Depends(
SortParam(
[
"id",
"dag_id",
"state",
"job_type",
"start_date",
"end_date",
"latest_heartbeat",
"executor_class",
"hostname",
"unixname",
],
Job,
).dynamic_depends(default="id")
),
],
session: Annotated[Session, Depends(get_session)],
state: QueryJobStateFilter,
job_type: QueryJobTypeFilter,
hostname: QueryJobHostnameFilter,
executor_class: QueryJobExecutorClassFilter,
is_alive: bool | None = None,
) -> JobCollectionResponse:
"""Get all jobs."""
base_select = select(Job).where(Job.state == JobState.RUNNING).order_by(Job.latest_heartbeat.desc())
# TODO: Refactor using the `FilterParam` class in commit `574b72e41cc5ed175a2bbf4356522589b836bb11`

jobs_select, total_entries = paginated_select(
select=base_select,
filters=[
start_date_range,
end_date_range,
state,
job_type,
hostname,
executor_class,
],
order_by=order_by,
limit=limit,
offset=offset,
session=session,
return_total_entries=True,
)
jobs = session.scalars(jobs_select).all()

if is_alive is not None:
jobs = [job for job in jobs if job.is_alive()]

return JobCollectionResponse(
jobs=[
JobResponse.model_validate(
job,
from_attributes=True,
)
for job in jobs
],
total_entries=total_entries,
)
Loading

0 comments on commit acf106b

Please sign in to comment.