Skip to content

Commit

Permalink
get API working
Browse files Browse the repository at this point in the history
  • Loading branch information
dstandish committed Jul 16, 2024
1 parent 84484bd commit e1048d5
Show file tree
Hide file tree
Showing 5 changed files with 269 additions and 42 deletions.
38 changes: 0 additions & 38 deletions airflow/api_connexion/endpoints/event_log_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
)
from airflow.auth.managers.models.resource_details import DagAccessEntity
from airflow.models import Log
from airflow.models.taskinstance import TaskEventLog
from airflow.utils import timezone
from airflow.utils.db import get_query_count
from airflow.utils.session import NEW_SESSION, provide_session
Expand Down Expand Up @@ -113,40 +112,3 @@ def get_event_logs(
return event_log_collection_schema.dump(
EventLogCollection(event_logs=event_logs, total_entries=total_entries)
)


@security.requires_access_dag("GET", DagAccessEntity.TASK_EVENT_LOG)
@format_parameters({"limit": check_limit})
@provide_session
def get_task_event_logs(
*,
dag_id: str | None = None,
task_id: str | None = None,
run_id: str | None = None,
map_index: int | None = None,
try_number: int | None = None,
limit: int,
offset: int | None = None,
session: Session = NEW_SESSION,
) -> APIResponse:
"""Get all log entries from event log."""
query = select(TaskEventLog)

if dag_id:
query = query.where(TaskEventLog.dag_id == dag_id)
if task_id:
query = query.where(TaskEventLog.task_id == task_id)
if run_id:
query = query.where(TaskEventLog.run_id == run_id)
if map_index:
query = query.where(TaskEventLog.map_index == map_index)
if try_number:
query = query.where(TaskEventLog.try_number == try_number)

total_entries = get_query_count(query, session=session)

query = query.order_by(TaskEventLog.id)
event_logs = session.scalars(query.offset(offset).limit(limit)).all()
return event_log_collection_schema.dump(
EventLogCollection(event_logs=event_logs, total_entries=total_entries)
)
75 changes: 75 additions & 0 deletions airflow/api_connexion/endpoints/task_event_log_endpoint.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

from __future__ import annotations

from typing import TYPE_CHECKING

from sqlalchemy import select

from airflow.api_connexion import security
from airflow.api_connexion.parameters import check_limit, format_parameters
from airflow.api_connexion.schemas.task_event_log_schema import (
TaskEventLogCollection,
task_event_log_collection_schema,
)
from airflow.auth.managers.models.resource_details import DagAccessEntity
from airflow.models.taskinstance import TaskEventLog
from airflow.utils.db import get_query_count
from airflow.utils.session import NEW_SESSION, provide_session

if TYPE_CHECKING:
from sqlalchemy.orm import Session

from airflow.api_connexion.types import APIResponse


@security.requires_access_dag("GET", DagAccessEntity.TASK_EVENT_LOG)
@format_parameters({"limit": check_limit})
@provide_session
def get_task_event_logs(
*,
dag_id: str | None = None,
task_id: str | None = None,
run_id: str | None = None,
map_index: int | None = None,
try_number: int | None = None,
limit: int,
offset: int | None = None,
session: Session = NEW_SESSION,
) -> APIResponse:
"""Get all log entries from event log."""
query = select(TaskEventLog)

if dag_id:
query = query.where(TaskEventLog.dag_id == dag_id)
if task_id:
query = query.where(TaskEventLog.task_id == task_id)
if run_id:
query = query.where(TaskEventLog.run_id == run_id)
if map_index:
query = query.where(TaskEventLog.map_index == map_index)
if try_number:
query = query.where(TaskEventLog.try_number == try_number)

total_entries = get_query_count(query, session=session)

query = query.order_by(TaskEventLog.id)
logs = session.scalars(query.offset(offset).limit(limit)).all()
return task_event_log_collection_schema.dump(
TaskEventLogCollection(data=logs, total_entries=total_entries)
)
96 changes: 96 additions & 0 deletions airflow/api_connexion/openapi/v1.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1229,6 +1229,37 @@ paths:
"404":
$ref: "#/components/responses/NotFound"

/taskEventLogs:
get:
summary: List log entries (experimental)
description: |
List log entries from task event log (experimental).
*New in version 2.10.0*
x-openapi-router-controller: airflow.api_connexion.endpoints.task_event_log_endpoint
operationId: get_task_event_logs
tags: [TaskEventLog]
parameters:
- $ref: "#/components/parameters/PageLimit"
- $ref: "#/components/parameters/PageOffset"
- $ref: "#/components/parameters/OrderBy"
- $ref: "#/components/parameters/FilterDAGID"
- $ref: "#/components/parameters/FilterTaskID"
- $ref: "#/components/parameters/FilterRunID"
- $ref: "#/components/parameters/FilterMapIndex"
- $ref: "#/components/parameters/FilterTryNumber"
responses:
"200":
description: Success.
content:
application/json:
schema:
$ref: "#/components/schemas/TaskEventLogCollection"
"401":
$ref: "#/components/responses/Unauthenticated"
"403":
$ref: "#/components/responses/PermissionDenied"

/eventLogs:
get:
summary: List log entries
Expand Down Expand Up @@ -3378,6 +3409,49 @@ components:
readOnly: true
nullable: true

TaskEventLog:
type: object
description: Log of task events.
properties:
id:
description: The task event log ID
type: integer
readOnly: true
created_at:
description: The time when the event was recorded
format: date-time
type: string
readOnly: true
dag_id:
description: The DAG ID
type: string
readOnly: true
nullable: true
task_id:
description: The Task ID
type: string
readOnly: true
nullable: true
run_id:
description: The DAG Run ID
type: string
readOnly: true
nullable: true
map_index:
description: The DAG Run ID
type: integer
readOnly: true
nullable: true
try_number:
description: The DAG Run ID
type: integer
readOnly: true
nullable: true
description:
description: A description of the event.
type: string
readOnly: true

EventLogCollection:
type: object
description: |
Expand All @@ -3393,6 +3467,21 @@ components:
$ref: "#/components/schemas/EventLog"
- $ref: "#/components/schemas/CollectionInfo"

TaskEventLogCollection:
type: object
description: |
Collection of task event logs. (Experimental)
*Added in version 2.10.0*
allOf:
- type: object
properties:
data:
type: array
items:
$ref: "#/components/schemas/TaskEventLog"
- $ref: "#/components/schemas/CollectionInfo"

ImportError:
type: object
properties:
Expand Down Expand Up @@ -5587,6 +5676,13 @@ components:
type: integer
description: Filter on map index for mapped task.

FilterTryNumber:
in: query
name: try_number
schema:
type: integer
description: Filter on try number for task instance.

OrderBy:
in: query
name: order_by
Expand Down
11 changes: 7 additions & 4 deletions airflow/api_connexion/schemas/task_event_log_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,19 +39,22 @@ class Meta:
run_id = auto_field(dump_only=True)
map_index = auto_field(dump_only=True)
try_number = auto_field(dump_only=True)
message = auto_field(dump_only=True)
description = auto_field(dump_only=True)


class TaskEventLogCollection(NamedTuple):
"""List of import errors with metadata."""

event_logs: list[TaskEventLog]
data: list[TaskEventLog]
total_entries: int


class TaskEventLogCollectionSchema(Schema):
"""EventLog Collection Schema."""

event_logs = fields.List(fields.Nested(TaskEventLogSchema))
task_event_logs = fields.List(fields.Nested(TaskEventLogSchema))
data = fields.List(fields.Nested(TaskEventLogSchema))
total_entries = fields.Int()


task_event_log_schema = TaskEventLogSchema()
task_event_log_collection_schema = TaskEventLogCollectionSchema()
Loading

0 comments on commit e1048d5

Please sign in to comment.