Skip to content

Commit

Permalink
Drop task_fail table
Browse files Browse the repository at this point in the history
This table was previously only used in Task Duraton Legacy view.

Follow-up of #38339 (review)
  • Loading branch information
kaxil committed Oct 29, 2024
1 parent b344cc1 commit f5268c8
Show file tree
Hide file tree
Showing 12 changed files with 1,339 additions and 1,434 deletions.
75 changes: 75 additions & 0 deletions airflow/migrations/versions/0044_3_0_0__drop_task_fail_table.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

"""
Drop task_fail table.
Revision ID: 5f57a45b8433
Revises: 486ac7936b78
Create Date: 2024-10-29 17:49:27.740730
"""

from __future__ import annotations

import sqlalchemy as sa
from alembic import op

from airflow.migrations.db_types import TIMESTAMP, StringID

# revision identifiers, used by Alembic.
revision = "5f57a45b8433"
down_revision = "486ac7936b78"
branch_labels = None
depends_on = None
airflow_version = "3.0.0"


def upgrade():
"""Apply Drop task_fail table."""
op.drop_table("task_fail")


def downgrade():
"""Re-add task_fail table."""
op.create_table(
"task_fail",
sa.Column("id", sa.Integer(), primary_key=True, nullable=False),
sa.Column("task_id", StringID(length=250), nullable=False),
sa.Column("dag_id", StringID(length=250), nullable=False),
sa.Column("run_id", StringID(length=250), nullable=False),
sa.Column("map_index", sa.Integer(), server_default=sa.text("-1"), nullable=False),
sa.Column("start_date", TIMESTAMP(timezone=True), nullable=True),
sa.Column("end_date", TIMESTAMP(timezone=True), nullable=True),
sa.Column("duration", sa.Integer(), nullable=True),
sa.ForeignKeyConstraint(
["dag_id", "task_id", "run_id", "map_index"],
[
"task_instance.dag_id",
"task_instance.task_id",
"task_instance.run_id",
"task_instance.map_index",
],
name="task_fail_ti_fkey",
ondelete="CASCADE",
),
sa.PrimaryKeyConstraint("id", name=op.f("task_fail_pkey")),
)
with op.batch_alter_table("task_fail", schema=None) as batch_op:
batch_op.create_index(
"idx_task_fail_task_instance", ["dag_id", "task_id", "run_id", "map_index"], unique=False
)
3 changes: 0 additions & 3 deletions airflow/models/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
"Pool",
"RenderedTaskInstanceFields",
"SkipMixin",
"TaskFail",
"TaskInstance",
"TaskReschedule",
"Trigger",
Expand Down Expand Up @@ -103,7 +102,6 @@ def __getattr__(name):
"Pool": "airflow.models.pool",
"RenderedTaskInstanceFields": "airflow.models.renderedtifields",
"SkipMixin": "airflow.models.skipmixin",
"TaskFail": "airflow.models.taskfail",
"TaskInstance": "airflow.models.taskinstance",
"TaskReschedule": "airflow.models.taskreschedule",
"Trigger": "airflow.models.trigger",
Expand Down Expand Up @@ -132,7 +130,6 @@ def __getattr__(name):
from airflow.models.pool import Pool
from airflow.models.renderedtifields import RenderedTaskInstanceFields
from airflow.models.skipmixin import SkipMixin
from airflow.models.taskfail import TaskFail
from airflow.models.taskinstance import TaskInstance, clear_task_instances
from airflow.models.taskinstancehistory import TaskInstanceHistory
from airflow.models.taskreschedule import TaskReschedule
Expand Down
85 changes: 0 additions & 85 deletions airflow/models/taskfail.py

This file was deleted.

5 changes: 0 additions & 5 deletions airflow/models/taskinstance.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,6 @@
from airflow.models.log import Log
from airflow.models.param import process_params
from airflow.models.renderedtifields import get_serialized_template_fields
from airflow.models.taskfail import TaskFail
from airflow.models.taskinstancekey import TaskInstanceKey
from airflow.models.taskmap import TaskMap
from airflow.models.taskreschedule import TaskReschedule
Expand Down Expand Up @@ -3217,9 +3216,6 @@ def fetch_handle_failure_context(
if not test_mode:
session.add(Log(TaskInstanceState.FAILED.value, ti))

# Log failure duration
session.add(TaskFail(ti=ti))

ti.clear_next_method_args()

# In extreme cases (zombie in case of dag with parse error) we might _not_ have a Task.
Expand Down Expand Up @@ -3855,7 +3851,6 @@ def clear_db_references(self, session: Session):
from airflow.models.renderedtifields import RenderedTaskInstanceFields

tables: list[type[TaskInstanceDependencies]] = [
TaskFail,
TaskInstanceNote,
TaskReschedule,
XCom,
Expand Down
2 changes: 1 addition & 1 deletion airflow/utils/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ class MappedClassProtocol(Protocol):
"2.9.2": "686269002441",
"2.10.0": "22ed7efa9da2",
"2.10.3": "5f2621c13b39",
"3.0.0": "486ac7936b78",
"3.0.0": "5f57a45b8433",
}


Expand Down
2 changes: 1 addition & 1 deletion docs/apache-airflow/img/airflow_erd.sha256
Original file line number Diff line number Diff line change
@@ -1 +1 @@
1f4da1c321e6d80cfd2f16eae1ca39a643099edb351fde6ea8c628e51ea02afb
5ec73e38d7ce2b81c9930d0d03c61a33ca68162a75b0ee2dc79d120913c35223
Loading

0 comments on commit f5268c8

Please sign in to comment.