Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DRAFT Add task instance event log table #40761

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 75 additions & 0 deletions airflow/api_connexion/endpoints/task_event_log_endpoint.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

from __future__ import annotations

from typing import TYPE_CHECKING

from sqlalchemy import select

from airflow.api_connexion import security
from airflow.api_connexion.parameters import check_limit, format_parameters
from airflow.api_connexion.schemas.task_event_log_schema import (
TaskEventLogCollection,
task_event_log_collection_schema,
)
from airflow.auth.managers.models.resource_details import DagAccessEntity
from airflow.models.taskinstance import TaskEventLog
from airflow.utils.db import get_query_count
from airflow.utils.session import NEW_SESSION, provide_session

if TYPE_CHECKING:
from sqlalchemy.orm import Session

from airflow.api_connexion.types import APIResponse


@security.requires_access_dag("GET", DagAccessEntity.TASK_EVENT_LOG)
@format_parameters({"limit": check_limit})
@provide_session
def get_task_event_logs(
*,
dag_id: str | None = None,
task_id: str | None = None,
run_id: str | None = None,
map_index: int | None = None,
try_number: int | None = None,
limit: int,
offset: int | None = None,
session: Session = NEW_SESSION,
) -> APIResponse:
"""Get all log entries from event log."""
query = select(TaskEventLog)

if dag_id:
query = query.where(TaskEventLog.dag_id == dag_id)
if task_id:
query = query.where(TaskEventLog.task_id == task_id)
if run_id:
query = query.where(TaskEventLog.run_id == run_id)
if map_index:
query = query.where(TaskEventLog.map_index == map_index)
if try_number:
query = query.where(TaskEventLog.try_number == try_number)

total_entries = get_query_count(query, session=session)

query = query.order_by(TaskEventLog.id)
logs = session.scalars(query.offset(offset).limit(limit)).all()
return task_event_log_collection_schema.dump(
TaskEventLogCollection(data=logs, total_entries=total_entries)
)
96 changes: 96 additions & 0 deletions airflow/api_connexion/openapi/v1.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1229,6 +1229,37 @@ paths:
"404":
$ref: "#/components/responses/NotFound"

/taskEventLogs:
get:
summary: List log entries (experimental)
description: |
List log entries from task event log (experimental).

*New in version 2.10.0*
x-openapi-router-controller: airflow.api_connexion.endpoints.task_event_log_endpoint
operationId: get_task_event_logs
tags: [TaskEventLog]
parameters:
- $ref: "#/components/parameters/PageLimit"
- $ref: "#/components/parameters/PageOffset"
- $ref: "#/components/parameters/OrderBy"
- $ref: "#/components/parameters/FilterDAGID"
- $ref: "#/components/parameters/FilterTaskID"
- $ref: "#/components/parameters/FilterRunID"
- $ref: "#/components/parameters/FilterMapIndex"
- $ref: "#/components/parameters/FilterTryNumber"
responses:
"200":
description: Success.
content:
application/json:
schema:
$ref: "#/components/schemas/TaskEventLogCollection"
"401":
$ref: "#/components/responses/Unauthenticated"
"403":
$ref: "#/components/responses/PermissionDenied"

/eventLogs:
get:
summary: List log entries
Expand Down Expand Up @@ -3378,6 +3409,49 @@ components:
readOnly: true
nullable: true

TaskEventLog:
type: object
description: Log of task events.
properties:
id:
description: The task event log ID
type: integer
readOnly: true
created_at:
description: The time when the event was recorded
format: date-time
type: string
readOnly: true
dag_id:
description: The DAG ID
type: string
readOnly: true
nullable: true
task_id:
description: The Task ID
type: string
readOnly: true
nullable: true
run_id:
description: The DAG Run ID
type: string
readOnly: true
nullable: true
map_index:
description: The DAG Run ID
type: integer
readOnly: true
nullable: true
try_number:
description: The DAG Run ID
type: integer
readOnly: true
nullable: true
description:
description: A description of the event.
type: string
readOnly: true

EventLogCollection:
type: object
description: |
Expand All @@ -3393,6 +3467,21 @@ components:
$ref: "#/components/schemas/EventLog"
- $ref: "#/components/schemas/CollectionInfo"

TaskEventLogCollection:
type: object
description: |
Collection of task event logs. (Experimental)

*Added in version 2.10.0*
allOf:
- type: object
properties:
data:
type: array
items:
$ref: "#/components/schemas/TaskEventLog"
- $ref: "#/components/schemas/CollectionInfo"

ImportError:
type: object
properties:
Expand Down Expand Up @@ -5587,6 +5676,13 @@ components:
type: integer
description: Filter on map index for mapped task.

FilterTryNumber:
in: query
name: try_number
schema:
type: integer
description: Filter on try number for task instance.

OrderBy:
in: query
name: order_by
Expand Down
2 changes: 2 additions & 0 deletions airflow/api_connexion/schemas/event_log_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from marshmallow import Schema, fields
from marshmallow_sqlalchemy import SQLAlchemySchema, auto_field

from airflow.api_connexion.schemas.task_event_log_schema import TaskEventLogSchema
from airflow.models.log import Log


Expand Down Expand Up @@ -54,6 +55,7 @@ class EventLogCollectionSchema(Schema):
"""EventLog Collection Schema."""

event_logs = fields.List(fields.Nested(EventLogSchema))
task_event_logs = fields.List(fields.Nested(TaskEventLogSchema))
total_entries = fields.Int()


Expand Down
60 changes: 60 additions & 0 deletions airflow/api_connexion/schemas/task_event_log_schema.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

from __future__ import annotations

from typing import NamedTuple

from marshmallow import Schema, fields
from marshmallow_sqlalchemy import SQLAlchemySchema, auto_field

from airflow.models.taskinstance import TaskEventLog


class TaskEventLogSchema(SQLAlchemySchema):
"""Event log schema."""

class Meta:
"""Meta."""

model = TaskEventLog

id = auto_field(dump_only=True)
dag_id = auto_field(dump_only=True)
task_id = auto_field(dump_only=True)
run_id = auto_field(dump_only=True)
map_index = auto_field(dump_only=True)
try_number = auto_field(dump_only=True)
description = auto_field(dump_only=True)


class TaskEventLogCollection(NamedTuple):
"""List of import errors with metadata."""

data: list[TaskEventLog]
total_entries: int


class TaskEventLogCollectionSchema(Schema):
"""EventLog Collection Schema."""

data = fields.List(fields.Nested(TaskEventLogSchema))
total_entries = fields.Int()


task_event_log_schema = TaskEventLogSchema()
task_event_log_collection_schema = TaskEventLogCollectionSchema()
1 change: 1 addition & 0 deletions airflow/auth/managers/models/resource_details.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ class DagAccessEntity(Enum):
"""Enum of DAG entities the user tries to access."""

AUDIT_LOG = "AUDIT_LOG"
TASK_EVENT_LOG = "TASK_EVENT_LOG"
CODE = "CODE"
DEPENDENCIES = "DEPENDENCIES"
RUN = "RUN"
Expand Down
33 changes: 17 additions & 16 deletions airflow/executors/base_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,8 @@
import logging
import sys
import warnings
from collections import defaultdict
from collections import defaultdict, deque
from dataclasses import dataclass, field
from functools import cached_property
from typing import TYPE_CHECKING, Any, List, Optional, Sequence, Tuple

import pendulum
Expand All @@ -33,7 +32,6 @@
from airflow.exceptions import RemovedInAirflow3Warning
from airflow.stats import Stats
from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.log.task_context_logger import TaskContextLogger
from airflow.utils.state import TaskInstanceState

PARALLELISM: int = conf.getint("core", "PARALLELISM")
Expand Down Expand Up @@ -131,6 +129,16 @@ def __init__(self, parallelism: int = PARALLELISM):
self.queued_tasks: dict[TaskInstanceKey, QueuedTaskInstanceType] = {}
self.running: set[TaskInstanceKey] = set()
self.event_buffer: dict[TaskInstanceKey, EventBufferValueType] = {}
self._task_event_logs: deque[tuple[TaskInstanceKey, str]] = deque()
"""
Deque for storing task event log messages.

This attribute is only internally public and should not be manipulated
directly by subclasses.

:meta private:
"""

self.attempts: dict[TaskInstanceKey, RunningRetryAttemptType] = defaultdict(RunningRetryAttemptType)

def __repr__(self):
Expand All @@ -139,6 +147,10 @@ def __repr__(self):
def start(self): # pragma: no cover
"""Executors may need to get things started."""

def log_task_event(self, *, ti_key: TaskInstanceKey, description: str):
"""Log an event to the task instance event log."""
self._task_event_logs.append((ti_key, description))

def queue_command(
self,
task_instance: TaskInstance,
Expand Down Expand Up @@ -286,12 +298,11 @@ def trigger_tasks(self, open_slots: int) -> None:
self.log.info("queued but still running; attempt=%s task=%s", attempt.total_tries, key)
continue
# Otherwise, we give up and remove the task from the queue.
self.send_message_to_task_logs(
logging.ERROR,

self.log.error(
"could not queue task %s (still running after %d attempts).",
key,
attempt.total_tries,
ti=ti,
)
del self.attempts[key]
del self.queued_tasks[key]
Expand Down Expand Up @@ -523,16 +534,6 @@ def send_callback(self, request: CallbackRequest) -> None:
raise ValueError("Callback sink is not ready.")
self.callback_sink.send(request)

@cached_property
def _task_context_logger(self) -> TaskContextLogger:
return TaskContextLogger(
component_name="Executor",
call_site_logger=self.log,
)

def send_message_to_task_logs(self, level: int, msg: str, *args, ti: TaskInstance | TaskInstanceKey):
self._task_context_logger._log(level, msg, *args, ti=ti)

@staticmethod
def get_cli_commands() -> list[GroupCommand]:
"""
Expand Down
Loading
Loading