From f5268c82f494b3003a618c52ade85eb6a221a337 Mon Sep 17 00:00:00 2001 From: Kaxil Naik Date: Tue, 29 Oct 2024 18:24:50 +0000 Subject: [PATCH] Drop `task_fail` table This table was previously only used in Task Duraton Legacy view. Follow-up of https://github.com/apache/airflow/pull/38339#pullrequestreview-1949728059 --- .../0044_3_0_0__drop_task_fail_table.py | 75 + airflow/models/__init__.py | 3 - airflow/models/taskfail.py | 85 - airflow/models/taskinstance.py | 5 - airflow/utils/db.py | 2 +- docs/apache-airflow/img/airflow_erd.sha256 | 2 +- docs/apache-airflow/img/airflow_erd.svg | 2571 ++++++++--------- docs/apache-airflow/migrations-ref.rst | 4 +- newsfragments/43490.significant.rst | 4 + tests/models/test_dag.py | 2 - tests/models/test_dagrun.py | 8 +- tests/models/test_taskinstance.py | 12 +- 12 files changed, 1339 insertions(+), 1434 deletions(-) create mode 100644 airflow/migrations/versions/0044_3_0_0__drop_task_fail_table.py delete mode 100644 airflow/models/taskfail.py create mode 100644 newsfragments/43490.significant.rst diff --git a/airflow/migrations/versions/0044_3_0_0__drop_task_fail_table.py b/airflow/migrations/versions/0044_3_0_0__drop_task_fail_table.py new file mode 100644 index 0000000000000..1e499218262dd --- /dev/null +++ b/airflow/migrations/versions/0044_3_0_0__drop_task_fail_table.py @@ -0,0 +1,75 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +""" +Drop task_fail table. + +Revision ID: 5f57a45b8433 +Revises: 486ac7936b78 +Create Date: 2024-10-29 17:49:27.740730 +""" + +from __future__ import annotations + +import sqlalchemy as sa +from alembic import op + +from airflow.migrations.db_types import TIMESTAMP, StringID + +# revision identifiers, used by Alembic. +revision = "5f57a45b8433" +down_revision = "486ac7936b78" +branch_labels = None +depends_on = None +airflow_version = "3.0.0" + + +def upgrade(): + """Apply Drop task_fail table.""" + op.drop_table("task_fail") + + +def downgrade(): + """Re-add task_fail table.""" + op.create_table( + "task_fail", + sa.Column("id", sa.Integer(), primary_key=True, nullable=False), + sa.Column("task_id", StringID(length=250), nullable=False), + sa.Column("dag_id", StringID(length=250), nullable=False), + sa.Column("run_id", StringID(length=250), nullable=False), + sa.Column("map_index", sa.Integer(), server_default=sa.text("-1"), nullable=False), + sa.Column("start_date", TIMESTAMP(timezone=True), nullable=True), + sa.Column("end_date", TIMESTAMP(timezone=True), nullable=True), + sa.Column("duration", sa.Integer(), nullable=True), + sa.ForeignKeyConstraint( + ["dag_id", "task_id", "run_id", "map_index"], + [ + "task_instance.dag_id", + "task_instance.task_id", + "task_instance.run_id", + "task_instance.map_index", + ], + name="task_fail_ti_fkey", + ondelete="CASCADE", + ), + sa.PrimaryKeyConstraint("id", name=op.f("task_fail_pkey")), + ) + with op.batch_alter_table("task_fail", schema=None) as batch_op: + batch_op.create_index( + "idx_task_fail_task_instance", ["dag_id", "task_id", "run_id", "map_index"], unique=False + ) diff --git a/airflow/models/__init__.py b/airflow/models/__init__.py index 1a998d60c5c92..7e71dddc65dfe 100644 --- a/airflow/models/__init__.py +++ b/airflow/models/__init__.py @@ -41,7 +41,6 @@ "Pool", "RenderedTaskInstanceFields", "SkipMixin", - "TaskFail", "TaskInstance", "TaskReschedule", "Trigger", @@ -103,7 +102,6 @@ def __getattr__(name): "Pool": "airflow.models.pool", "RenderedTaskInstanceFields": "airflow.models.renderedtifields", "SkipMixin": "airflow.models.skipmixin", - "TaskFail": "airflow.models.taskfail", "TaskInstance": "airflow.models.taskinstance", "TaskReschedule": "airflow.models.taskreschedule", "Trigger": "airflow.models.trigger", @@ -132,7 +130,6 @@ def __getattr__(name): from airflow.models.pool import Pool from airflow.models.renderedtifields import RenderedTaskInstanceFields from airflow.models.skipmixin import SkipMixin - from airflow.models.taskfail import TaskFail from airflow.models.taskinstance import TaskInstance, clear_task_instances from airflow.models.taskinstancehistory import TaskInstanceHistory from airflow.models.taskreschedule import TaskReschedule diff --git a/airflow/models/taskfail.py b/airflow/models/taskfail.py deleted file mode 100644 index 1bf7db7a11ac3..0000000000000 --- a/airflow/models/taskfail.py +++ /dev/null @@ -1,85 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -"""Taskfail tracks the failed run durations of each task instance.""" - -from __future__ import annotations - -from sqlalchemy import Column, ForeignKeyConstraint, Index, Integer, text -from sqlalchemy.orm import relationship - -from airflow.models.base import StringID, TaskInstanceDependencies -from airflow.utils.sqlalchemy import UtcDateTime - - -class TaskFail(TaskInstanceDependencies): - """TaskFail tracks the failed run durations of each task instance.""" - - __tablename__ = "task_fail" - - id = Column(Integer, primary_key=True) - task_id = Column(StringID(), nullable=False) - dag_id = Column(StringID(), nullable=False) - run_id = Column(StringID(), nullable=False) - map_index = Column(Integer, nullable=False, server_default=text("-1")) - start_date = Column(UtcDateTime) - end_date = Column(UtcDateTime) - duration = Column(Integer) - - __table_args__ = ( - Index("idx_task_fail_task_instance", dag_id, task_id, run_id, map_index), - ForeignKeyConstraint( - [dag_id, task_id, run_id, map_index], - [ - "task_instance.dag_id", - "task_instance.task_id", - "task_instance.run_id", - "task_instance.map_index", - ], - name="task_fail_ti_fkey", - ondelete="CASCADE", - ), - ) - - # We don't need a DB level FK here, as we already have that to TI (which has one to DR) but by defining - # the relationship we can more easily find the execution date for these rows - dag_run = relationship( - "DagRun", - primaryjoin="""and_( - TaskFail.dag_id == foreign(DagRun.dag_id), - TaskFail.run_id == foreign(DagRun.run_id), - )""", - viewonly=True, - ) - - def __init__(self, ti): - self.dag_id = ti.dag_id - self.task_id = ti.task_id - self.run_id = ti.run_id - self.map_index = ti.map_index - self.start_date = ti.start_date - self.end_date = ti.end_date - if self.end_date and self.start_date: - self.duration = int((self.end_date - self.start_date).total_seconds()) - else: - self.duration = None - - def __repr__(self): - prefix = f"<{self.__class__.__name__}: {self.dag_id}.{self.task_id} {self.run_id}" - if self.map_index != -1: - prefix += f" map_index={self.map_index}" - return prefix + ">" diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py index f67f61529e151..95098181c07ee 100644 --- a/airflow/models/taskinstance.py +++ b/airflow/models/taskinstance.py @@ -96,7 +96,6 @@ from airflow.models.log import Log from airflow.models.param import process_params from airflow.models.renderedtifields import get_serialized_template_fields -from airflow.models.taskfail import TaskFail from airflow.models.taskinstancekey import TaskInstanceKey from airflow.models.taskmap import TaskMap from airflow.models.taskreschedule import TaskReschedule @@ -3217,9 +3216,6 @@ def fetch_handle_failure_context( if not test_mode: session.add(Log(TaskInstanceState.FAILED.value, ti)) - # Log failure duration - session.add(TaskFail(ti=ti)) - ti.clear_next_method_args() # In extreme cases (zombie in case of dag with parse error) we might _not_ have a Task. @@ -3855,7 +3851,6 @@ def clear_db_references(self, session: Session): from airflow.models.renderedtifields import RenderedTaskInstanceFields tables: list[type[TaskInstanceDependencies]] = [ - TaskFail, TaskInstanceNote, TaskReschedule, XCom, diff --git a/airflow/utils/db.py b/airflow/utils/db.py index 6961302156adb..34a265fc004be 100644 --- a/airflow/utils/db.py +++ b/airflow/utils/db.py @@ -97,7 +97,7 @@ class MappedClassProtocol(Protocol): "2.9.2": "686269002441", "2.10.0": "22ed7efa9da2", "2.10.3": "5f2621c13b39", - "3.0.0": "486ac7936b78", + "3.0.0": "5f57a45b8433", } diff --git a/docs/apache-airflow/img/airflow_erd.sha256 b/docs/apache-airflow/img/airflow_erd.sha256 index 990e4395339e5..bb6e976178d38 100644 --- a/docs/apache-airflow/img/airflow_erd.sha256 +++ b/docs/apache-airflow/img/airflow_erd.sha256 @@ -1 +1 @@ -1f4da1c321e6d80cfd2f16eae1ca39a643099edb351fde6ea8c628e51ea02afb \ No newline at end of file +5ec73e38d7ce2b81c9930d0d03c61a33ca68162a75b0ee2dc79d120913c35223 \ No newline at end of file diff --git a/docs/apache-airflow/img/airflow_erd.svg b/docs/apache-airflow/img/airflow_erd.svg index 5a75a96fc87ff..177c5a60f14ee 100644 --- a/docs/apache-airflow/img/airflow_erd.svg +++ b/docs/apache-airflow/img/airflow_erd.svg @@ -4,11 +4,11 @@ - - + + %3 - + log @@ -396,1180 +396,1109 @@ asset_alias - -asset_alias - -id - - [INTEGER] - NOT NULL - -group - - [VARCHAR(1500)] - NOT NULL - -name - - [VARCHAR(1500)] - NOT NULL + +asset_alias + +id + + [INTEGER] + NOT NULL + +group + + [VARCHAR(1500)] + NOT NULL + +name + + [VARCHAR(1500)] + NOT NULL asset_alias_asset - -asset_alias_asset - -alias_id - - [INTEGER] - NOT NULL - -asset_id - - [INTEGER] - NOT NULL + +asset_alias_asset + +alias_id + + [INTEGER] + NOT NULL + +asset_id + + [INTEGER] + NOT NULL asset_alias--asset_alias_asset - -0..N -1 + +0..N +1 asset_alias_asset_event - -asset_alias_asset_event - -alias_id - - [INTEGER] - NOT NULL - -event_id - - [INTEGER] - NOT NULL + +asset_alias_asset_event + +alias_id + + [INTEGER] + NOT NULL + +event_id + + [INTEGER] + NOT NULL asset_alias--asset_alias_asset_event - -0..N -1 + +0..N +1 dag_schedule_asset_alias_reference - -dag_schedule_asset_alias_reference - -alias_id - - [INTEGER] - NOT NULL - -dag_id - - [VARCHAR(250)] - NOT NULL - -created_at - - [TIMESTAMP] - NOT NULL - -updated_at - - [TIMESTAMP] - NOT NULL + +dag_schedule_asset_alias_reference + +alias_id + + [INTEGER] + NOT NULL + +dag_id + + [VARCHAR(250)] + NOT NULL + +created_at + + [TIMESTAMP] + NOT NULL + +updated_at + + [TIMESTAMP] + NOT NULL asset_alias--dag_schedule_asset_alias_reference - -0..N -1 + +0..N +1 asset - -asset - -id - - [INTEGER] - NOT NULL - -created_at - - [TIMESTAMP] - NOT NULL - -extra - - [JSON] - NOT NULL - -group - - [VARCHAR(1500)] - NOT NULL - -name - - [VARCHAR(1500)] - NOT NULL - -updated_at - - [TIMESTAMP] - NOT NULL - -uri - - [VARCHAR(1500)] - NOT NULL + +asset + +id + + [INTEGER] + NOT NULL + +created_at + + [TIMESTAMP] + NOT NULL + +extra + + [JSON] + NOT NULL + +group + + [VARCHAR(1500)] + NOT NULL + +name + + [VARCHAR(1500)] + NOT NULL + +updated_at + + [TIMESTAMP] + NOT NULL + +uri + + [VARCHAR(1500)] + NOT NULL asset--asset_alias_asset - -0..N -1 + +0..N +1 asset_active - -asset_active - -name - - [VARCHAR(1500)] - NOT NULL - -uri - - [VARCHAR(1500)] - NOT NULL + +asset_active + +name + + [VARCHAR(1500)] + NOT NULL + +uri + + [VARCHAR(1500)] + NOT NULL asset--asset_active - -1 -1 + +1 +1 asset--asset_active - -1 -1 + +1 +1 dag_schedule_asset_reference - -dag_schedule_asset_reference - -asset_id - - [INTEGER] - NOT NULL - -dag_id - - [VARCHAR(250)] - NOT NULL - -created_at - - [TIMESTAMP] - NOT NULL - -updated_at - - [TIMESTAMP] - NOT NULL + +dag_schedule_asset_reference + +asset_id + + [INTEGER] + NOT NULL + +dag_id + + [VARCHAR(250)] + NOT NULL + +created_at + + [TIMESTAMP] + NOT NULL + +updated_at + + [TIMESTAMP] + NOT NULL asset--dag_schedule_asset_reference - -0..N -1 + +0..N +1 task_outlet_asset_reference - -task_outlet_asset_reference - -asset_id - - [INTEGER] - NOT NULL - -dag_id - - [VARCHAR(250)] - NOT NULL - -task_id - - [VARCHAR(250)] - NOT NULL - -created_at - - [TIMESTAMP] - NOT NULL - -updated_at - - [TIMESTAMP] - NOT NULL + +task_outlet_asset_reference + +asset_id + + [INTEGER] + NOT NULL + +dag_id + + [VARCHAR(250)] + NOT NULL + +task_id + + [VARCHAR(250)] + NOT NULL + +created_at + + [TIMESTAMP] + NOT NULL + +updated_at + + [TIMESTAMP] + NOT NULL asset--task_outlet_asset_reference - -0..N -1 + +0..N +1 asset_dag_run_queue - -asset_dag_run_queue - -asset_id - - [INTEGER] - NOT NULL - -target_dag_id - - [VARCHAR(250)] - NOT NULL - -created_at - - [TIMESTAMP] - NOT NULL + +asset_dag_run_queue + +asset_id + + [INTEGER] + NOT NULL + +target_dag_id + + [VARCHAR(250)] + NOT NULL + +created_at + + [TIMESTAMP] + NOT NULL asset--asset_dag_run_queue - -0..N -1 + +0..N +1 asset_event - -asset_event - -id - - [INTEGER] - NOT NULL - -asset_id - - [INTEGER] - NOT NULL - -extra - - [JSON] - NOT NULL - -source_dag_id - - [VARCHAR(250)] - -source_map_index - - [INTEGER] - -source_run_id - - [VARCHAR(250)] - -source_task_id - - [VARCHAR(250)] - -timestamp - - [TIMESTAMP] - NOT NULL + +asset_event + +id + + [INTEGER] + NOT NULL + +asset_id + + [INTEGER] + NOT NULL + +extra + + [JSON] + NOT NULL + +source_dag_id + + [VARCHAR(250)] + +source_map_index + + [INTEGER] + +source_run_id + + [VARCHAR(250)] + +source_task_id + + [VARCHAR(250)] + +timestamp + + [TIMESTAMP] + NOT NULL asset_event--asset_alias_asset_event - -0..N -1 + +0..N +1 dagrun_asset_event - -dagrun_asset_event - -dag_run_id - - [INTEGER] - NOT NULL - -event_id - - [INTEGER] - NOT NULL + +dagrun_asset_event + +dag_run_id + + [INTEGER] + NOT NULL + +event_id + + [INTEGER] + NOT NULL asset_event--dagrun_asset_event - -0..N -1 + +0..N +1 dag - -dag - -dag_id - - [VARCHAR(250)] - NOT NULL - -asset_expression - - [JSON] - -dag_display_name - - [VARCHAR(2000)] - -default_view - - [VARCHAR(25)] - -description - - [TEXT] - -fileloc - - [VARCHAR(2000)] - -has_import_errors - - [BOOLEAN] - -has_task_concurrency_limits - - [BOOLEAN] - NOT NULL - -is_active - - [BOOLEAN] - -is_paused - - [BOOLEAN] - -last_expired - - [TIMESTAMP] - -last_parsed_time - - [TIMESTAMP] - -last_pickled - - [TIMESTAMP] - -max_active_runs - - [INTEGER] - -max_active_tasks - - [INTEGER] - NOT NULL - -max_consecutive_failed_dag_runs - - [INTEGER] - NOT NULL - -next_dagrun - - [TIMESTAMP] - -next_dagrun_create_after - - [TIMESTAMP] - -next_dagrun_data_interval_end - - [TIMESTAMP] - -next_dagrun_data_interval_start - - [TIMESTAMP] - -owners - - [VARCHAR(2000)] - -pickle_id - - [INTEGER] - -processor_subdir - - [VARCHAR(2000)] - -timetable_description - - [VARCHAR(1000)] - -timetable_summary - - [TEXT] + +dag + +dag_id + + [VARCHAR(250)] + NOT NULL + +asset_expression + + [JSON] + +dag_display_name + + [VARCHAR(2000)] + +default_view + + [VARCHAR(25)] + +description + + [TEXT] + +fileloc + + [VARCHAR(2000)] + +has_import_errors + + [BOOLEAN] + +has_task_concurrency_limits + + [BOOLEAN] + NOT NULL + +is_active + + [BOOLEAN] + +is_paused + + [BOOLEAN] + +last_expired + + [TIMESTAMP] + +last_parsed_time + + [TIMESTAMP] + +last_pickled + + [TIMESTAMP] + +max_active_runs + + [INTEGER] + +max_active_tasks + + [INTEGER] + NOT NULL + +max_consecutive_failed_dag_runs + + [INTEGER] + NOT NULL + +next_dagrun + + [TIMESTAMP] + +next_dagrun_create_after + + [TIMESTAMP] + +next_dagrun_data_interval_end + + [TIMESTAMP] + +next_dagrun_data_interval_start + + [TIMESTAMP] + +owners + + [VARCHAR(2000)] + +pickle_id + + [INTEGER] + +processor_subdir + + [VARCHAR(2000)] + +timetable_description + + [VARCHAR(1000)] + +timetable_summary + + [TEXT] dag--dag_schedule_asset_alias_reference - -0..N -1 + +0..N +1 dag--dag_schedule_asset_reference - -0..N -1 + +0..N +1 dag--task_outlet_asset_reference - -0..N -1 + +0..N +1 dag--asset_dag_run_queue - -0..N -1 + +0..N +1 dag_tag - -dag_tag - -dag_id - - [VARCHAR(250)] - NOT NULL - -name - - [VARCHAR(100)] - NOT NULL + +dag_tag + +dag_id + + [VARCHAR(250)] + NOT NULL + +name + + [VARCHAR(100)] + NOT NULL dag--dag_tag - -0..N -1 + +0..N +1 dag_owner_attributes - -dag_owner_attributes - -dag_id - - [VARCHAR(250)] - NOT NULL - -owner - - [VARCHAR(500)] - NOT NULL - -link - - [VARCHAR(500)] - NOT NULL + +dag_owner_attributes + +dag_id + + [VARCHAR(250)] + NOT NULL + +owner + + [VARCHAR(500)] + NOT NULL + +link + + [VARCHAR(500)] + NOT NULL dag--dag_owner_attributes - -0..N -1 + +0..N +1 dag_warning - -dag_warning - -dag_id - - [VARCHAR(250)] - NOT NULL - -warning_type - - [VARCHAR(50)] - NOT NULL - -message - - [TEXT] - NOT NULL - -timestamp - - [TIMESTAMP] - NOT NULL + +dag_warning + +dag_id + + [VARCHAR(250)] + NOT NULL + +warning_type + + [VARCHAR(50)] + NOT NULL + +message + + [TEXT] + NOT NULL + +timestamp + + [TIMESTAMP] + NOT NULL dag--dag_warning - -0..N -1 + +0..N +1 log_template - -log_template - -id - - [INTEGER] - NOT NULL - -created_at - - [TIMESTAMP] - NOT NULL - -elasticsearch_id - - [TEXT] - NOT NULL - -filename - - [TEXT] - NOT NULL + +log_template + +id + + [INTEGER] + NOT NULL + +created_at + + [TIMESTAMP] + NOT NULL + +elasticsearch_id + + [TEXT] + NOT NULL + +filename + + [TEXT] + NOT NULL dag_run - -dag_run - -id - - [INTEGER] - NOT NULL - -backfill_id - - [INTEGER] - -clear_number - - [INTEGER] - NOT NULL - -conf - - [BYTEA] - -creating_job_id - - [INTEGER] - -dag_hash - - [VARCHAR(32)] - -dag_id - - [VARCHAR(250)] - NOT NULL - -data_interval_end - - [TIMESTAMP] - -data_interval_start - - [TIMESTAMP] - -end_date - - [TIMESTAMP] - -external_trigger - - [BOOLEAN] - -last_scheduling_decision - - [TIMESTAMP] - -log_template_id - - [INTEGER] - -logical_date - - [TIMESTAMP] - NOT NULL - -queued_at - - [TIMESTAMP] - -run_id - - [VARCHAR(250)] - NOT NULL - -run_type - - [VARCHAR(50)] - NOT NULL - -start_date - - [TIMESTAMP] - -state - - [VARCHAR(50)] - -triggered_by - - [VARCHAR(50)] - -updated_at - - [TIMESTAMP] + +dag_run + +id + + [INTEGER] + NOT NULL + +backfill_id + + [INTEGER] + +clear_number + + [INTEGER] + NOT NULL + +conf + + [BYTEA] + +creating_job_id + + [INTEGER] + +dag_hash + + [VARCHAR(32)] + +dag_id + + [VARCHAR(250)] + NOT NULL + +data_interval_end + + [TIMESTAMP] + +data_interval_start + + [TIMESTAMP] + +end_date + + [TIMESTAMP] + +external_trigger + + [BOOLEAN] + +last_scheduling_decision + + [TIMESTAMP] + +log_template_id + + [INTEGER] + +logical_date + + [TIMESTAMP] + NOT NULL + +queued_at + + [TIMESTAMP] + +run_id + + [VARCHAR(250)] + NOT NULL + +run_type + + [VARCHAR(50)] + NOT NULL + +start_date + + [TIMESTAMP] + +state + + [VARCHAR(50)] + +triggered_by + + [VARCHAR(50)] + +updated_at + + [TIMESTAMP] log_template--dag_run - -0..N -{0,1} + +0..N +{0,1} dag_run--dagrun_asset_event - -0..N -1 + +0..N +1 task_instance - -task_instance - -id - - [UUID] - NOT NULL - -custom_operator_name - - [VARCHAR(1000)] - -dag_id - - [VARCHAR(250)] - NOT NULL - -duration - - [DOUBLE_PRECISION] - -end_date - - [TIMESTAMP] - -executor - - [VARCHAR(1000)] - -executor_config - - [BYTEA] - -external_executor_id - - [VARCHAR(250)] - -hostname - - [VARCHAR(1000)] - -job_id - - [INTEGER] - -map_index - - [INTEGER] - NOT NULL - -max_tries - - [INTEGER] - -next_kwargs - - [JSON] - -next_method - - [VARCHAR(1000)] - -operator - - [VARCHAR(1000)] - -pid - - [INTEGER] - -pool - - [VARCHAR(256)] - NOT NULL - -pool_slots - - [INTEGER] - NOT NULL - -priority_weight - - [INTEGER] - -queue - - [VARCHAR(256)] - -queued_by_job_id - - [INTEGER] - -queued_dttm - - [TIMESTAMP] - -rendered_map_index - - [VARCHAR(250)] - -run_id - - [VARCHAR(250)] - NOT NULL - -start_date - - [TIMESTAMP] - -state - - [VARCHAR(20)] - -task_display_name - - [VARCHAR(2000)] - -task_id - - [VARCHAR(250)] - NOT NULL - -trigger_id - - [INTEGER] - -trigger_timeout - - [TIMESTAMP] - -try_number - - [INTEGER] - -unixname - - [VARCHAR(1000)] - -updated_at - - [TIMESTAMP] + +task_instance + +id + + [UUID] + NOT NULL + +custom_operator_name + + [VARCHAR(1000)] + +dag_id + + [VARCHAR(250)] + NOT NULL + +duration + + [DOUBLE_PRECISION] + +end_date + + [TIMESTAMP] + +executor + + [VARCHAR(1000)] + +executor_config + + [BYTEA] + +external_executor_id + + [VARCHAR(250)] + +hostname + + [VARCHAR(1000)] + +job_id + + [INTEGER] + +map_index + + [INTEGER] + NOT NULL + +max_tries + + [INTEGER] + +next_kwargs + + [JSON] + +next_method + + [VARCHAR(1000)] + +operator + + [VARCHAR(1000)] + +pid + + [INTEGER] + +pool + + [VARCHAR(256)] + NOT NULL + +pool_slots + + [INTEGER] + NOT NULL + +priority_weight + + [INTEGER] + +queue + + [VARCHAR(256)] + +queued_by_job_id + + [INTEGER] + +queued_dttm + + [TIMESTAMP] + +rendered_map_index + + [VARCHAR(250)] + +run_id + + [VARCHAR(250)] + NOT NULL + +start_date + + [TIMESTAMP] + +state + + [VARCHAR(20)] + +task_display_name + + [VARCHAR(2000)] + +task_id + + [VARCHAR(250)] + NOT NULL + +trigger_id + + [INTEGER] + +trigger_timeout + + [TIMESTAMP] + +try_number + + [INTEGER] + +unixname + + [VARCHAR(1000)] + +updated_at + + [TIMESTAMP] dag_run--task_instance - -0..N -1 + +0..N +1 dag_run--task_instance - -0..N -1 + +0..N +1 backfill_dag_run - -backfill_dag_run - -id - - [INTEGER] - NOT NULL - -backfill_id - - [INTEGER] - NOT NULL - -dag_run_id - - [INTEGER] - -exception_reason - - [VARCHAR(250)] - -logical_date - - [TIMESTAMP] - NOT NULL - -sort_ordinal - - [INTEGER] - NOT NULL + +backfill_dag_run + +id + + [INTEGER] + NOT NULL + +backfill_id + + [INTEGER] + NOT NULL + +dag_run_id + + [INTEGER] + +exception_reason + + [VARCHAR(250)] + +logical_date + + [TIMESTAMP] + NOT NULL + +sort_ordinal + + [INTEGER] + NOT NULL dag_run--backfill_dag_run - -0..N -{0,1} + +0..N +{0,1} dag_run_note - -dag_run_note - -dag_run_id - - [INTEGER] - NOT NULL - -content - - [VARCHAR(1000)] - -created_at - - [TIMESTAMP] - NOT NULL - -updated_at - - [TIMESTAMP] - NOT NULL - -user_id - - [VARCHAR(128)] + +dag_run_note + +dag_run_id + + [INTEGER] + NOT NULL + +content + + [VARCHAR(1000)] + +created_at + + [TIMESTAMP] + NOT NULL + +updated_at + + [TIMESTAMP] + NOT NULL + +user_id + + [VARCHAR(128)] dag_run--dag_run_note - -1 -1 + +1 +1 task_reschedule - -task_reschedule - -id - - [INTEGER] - NOT NULL - -dag_id - - [VARCHAR(250)] - NOT NULL - -duration - - [INTEGER] - NOT NULL - -end_date - - [TIMESTAMP] - NOT NULL - -map_index - - [INTEGER] - NOT NULL - -reschedule_date - - [TIMESTAMP] - NOT NULL - -run_id - - [VARCHAR(250)] - NOT NULL - -start_date - - [TIMESTAMP] - NOT NULL - -task_id - - [VARCHAR(250)] - NOT NULL - -try_number - - [INTEGER] - NOT NULL + +task_reschedule + +id + + [INTEGER] + NOT NULL + +dag_id + + [VARCHAR(250)] + NOT NULL + +duration + + [INTEGER] + NOT NULL + +end_date + + [TIMESTAMP] + NOT NULL + +map_index + + [INTEGER] + NOT NULL + +reschedule_date + + [TIMESTAMP] + NOT NULL + +run_id + + [VARCHAR(250)] + NOT NULL + +start_date + + [TIMESTAMP] + NOT NULL + +task_id + + [VARCHAR(250)] + NOT NULL + +try_number + + [INTEGER] + NOT NULL dag_run--task_reschedule - -0..N -1 + +0..N +1 dag_run--task_reschedule - -0..N -1 + +0..N +1 - + task_instance--task_reschedule - -0..N -1 + +0..N +1 - + task_instance--task_reschedule - -0..N -1 + +0..N +1 - + task_instance--task_reschedule - -0..N -1 + +0..N +1 - + task_instance--task_reschedule - -0..N -1 + +0..N +1 rendered_task_instance_fields - -rendered_task_instance_fields - -dag_id - - [VARCHAR(250)] - NOT NULL - -map_index - - [INTEGER] - NOT NULL - -run_id - - [VARCHAR(250)] - NOT NULL - -task_id - - [VARCHAR(250)] - NOT NULL - -k8s_pod_yaml - - [JSON] - -rendered_fields - - [JSON] - NOT NULL + +rendered_task_instance_fields + +dag_id + + [VARCHAR(250)] + NOT NULL + +map_index + + [INTEGER] + NOT NULL + +run_id + + [VARCHAR(250)] + NOT NULL + +task_id + + [VARCHAR(250)] + NOT NULL + +k8s_pod_yaml + + [JSON] + +rendered_fields + + [JSON] + NOT NULL task_instance--rendered_task_instance_fields - -0..N -1 + +0..N +1 task_instance--rendered_task_instance_fields - -0..N -1 + +0..N +1 task_instance--rendered_task_instance_fields - -0..N -1 + +0..N +1 task_instance--rendered_task_instance_fields - -0..N -1 - - - -task_fail - -task_fail - -id - - [INTEGER] - NOT NULL - -dag_id - - [VARCHAR(250)] - NOT NULL - -duration - - [INTEGER] - -end_date - - [TIMESTAMP] - -map_index - - [INTEGER] - NOT NULL - -run_id - - [VARCHAR(250)] - NOT NULL - -start_date - - [TIMESTAMP] - -task_id - - [VARCHAR(250)] - NOT NULL - - - -task_instance--task_fail - -0..N -1 - - - -task_instance--task_fail - -0..N -1 - - - -task_instance--task_fail - -0..N -1 - - - -task_instance--task_fail - -0..N -1 + +0..N +1 - + task_map task_map @@ -1604,35 +1533,35 @@ NOT NULL - + task_instance--task_map - -0..N -1 + +0..N +1 - + task_instance--task_map - -0..N -1 + +0..N +1 - + task_instance--task_map - -0..N -1 + +0..N +1 - + task_instance--task_map - -0..N -1 + +0..N +1 - + xcom xcom @@ -1677,35 +1606,35 @@ [BYTEA] - + task_instance--xcom - -0..N -1 + +0..N +1 - + task_instance--xcom - -0..N -1 + +0..N +1 - + task_instance--xcom - -0..N -1 + +0..N +1 - + task_instance--xcom - -0..N -1 + +0..N +1 - + task_instance_note task_instance_note @@ -1749,35 +1678,35 @@ [VARCHAR(128)] - + task_instance--task_instance_note - -0..N -1 + +0..N +1 - + task_instance--task_instance_note - -0..N -1 + +0..N +1 - + task_instance--task_instance_note - -0..N -1 + +0..N +1 - + task_instance--task_instance_note - -0..N -1 + +0..N +1 - + task_instance_history task_instance_history @@ -1923,32 +1852,32 @@ [TIMESTAMP] - + task_instance--task_instance_history - -0..N -1 + +0..N +1 - + task_instance--task_instance_history - -0..N -1 + +0..N +1 - + task_instance--task_instance_history - -0..N -1 + +0..N +1 - + task_instance--task_instance_history - -0..N -1 + +0..N +1 @@ -2011,369 +1940,369 @@ backfill--dag_run - -0..N -{0,1} + +0..N +{0,1} backfill--backfill_dag_run - -0..N -1 + +0..N +1 trigger - -trigger - -id - - [INTEGER] - NOT NULL - -classpath - - [VARCHAR(1000)] - NOT NULL - -created_date - - [TIMESTAMP] - NOT NULL - -kwargs - - [TEXT] - NOT NULL - -triggerer_id - - [INTEGER] + +trigger + +id + + [INTEGER] + NOT NULL + +classpath + + [VARCHAR(1000)] + NOT NULL + +created_date + + [TIMESTAMP] + NOT NULL + +kwargs + + [TEXT] + NOT NULL + +triggerer_id + + [INTEGER] trigger--task_instance - -0..N -{0,1} + +0..N +{0,1} - + session - -session - -id - - [INTEGER] - NOT NULL - -data - - [BYTEA] - -expiry - - [TIMESTAMP] - -session_id - - [VARCHAR(255)] + +session + +id + + [INTEGER] + NOT NULL + +data + + [BYTEA] + +expiry + + [TIMESTAMP] + +session_id + + [VARCHAR(255)] - + alembic_version - -alembic_version - -version_num - - [VARCHAR(32)] - NOT NULL + +alembic_version + +version_num + + [VARCHAR(32)] + NOT NULL - + ab_user - -ab_user - -id - - [INTEGER] - NOT NULL - -active - - [BOOLEAN] - -changed_by_fk - - [INTEGER] - -changed_on - - [TIMESTAMP] - -created_by_fk - - [INTEGER] - -created_on - - [TIMESTAMP] - -email - - [VARCHAR(512)] - NOT NULL - -fail_login_count - - [INTEGER] - -first_name - - [VARCHAR(256)] - NOT NULL - -last_login - - [TIMESTAMP] - -last_name - - [VARCHAR(256)] - NOT NULL - -login_count - - [INTEGER] - -password - - [VARCHAR(256)] - -username - - [VARCHAR(512)] - NOT NULL + +ab_user + +id + + [INTEGER] + NOT NULL + +active + + [BOOLEAN] + +changed_by_fk + + [INTEGER] + +changed_on + + [TIMESTAMP] + +created_by_fk + + [INTEGER] + +created_on + + [TIMESTAMP] + +email + + [VARCHAR(512)] + NOT NULL + +fail_login_count + + [INTEGER] + +first_name + + [VARCHAR(256)] + NOT NULL + +last_login + + [TIMESTAMP] + +last_name + + [VARCHAR(256)] + NOT NULL + +login_count + + [INTEGER] + +password + + [VARCHAR(256)] + +username + + [VARCHAR(512)] + NOT NULL - + ab_user--ab_user - -0..N -{0,1} + +0..N +{0,1} - + ab_user--ab_user - -0..N -{0,1} + +0..N +{0,1} - + ab_user_role - -ab_user_role - -id - - [INTEGER] - NOT NULL - -role_id - - [INTEGER] - -user_id - - [INTEGER] + +ab_user_role + +id + + [INTEGER] + NOT NULL + +role_id + + [INTEGER] + +user_id + + [INTEGER] - + ab_user--ab_user_role - -0..N -{0,1} + +0..N +{0,1} - + ab_register_user - -ab_register_user - -id - - [INTEGER] - NOT NULL - -email - - [VARCHAR(512)] - NOT NULL - -first_name - - [VARCHAR(256)] - NOT NULL - -last_name - - [VARCHAR(256)] - NOT NULL - -password - - [VARCHAR(256)] - -registration_date - - [TIMESTAMP] - -registration_hash - - [VARCHAR(256)] - -username - - [VARCHAR(512)] - NOT NULL + +ab_register_user + +id + + [INTEGER] + NOT NULL + +email + + [VARCHAR(512)] + NOT NULL + +first_name + + [VARCHAR(256)] + NOT NULL + +last_name + + [VARCHAR(256)] + NOT NULL + +password + + [VARCHAR(256)] + +registration_date + + [TIMESTAMP] + +registration_hash + + [VARCHAR(256)] + +username + + [VARCHAR(512)] + NOT NULL - + ab_permission - -ab_permission - -id - - [INTEGER] - NOT NULL - -name - - [VARCHAR(100)] - NOT NULL + +ab_permission + +id + + [INTEGER] + NOT NULL + +name + + [VARCHAR(100)] + NOT NULL - + ab_permission_view - -ab_permission_view - -id - - [INTEGER] - NOT NULL - -permission_id - - [INTEGER] - -view_menu_id - - [INTEGER] + +ab_permission_view + +id + + [INTEGER] + NOT NULL + +permission_id + + [INTEGER] + +view_menu_id + + [INTEGER] - + ab_permission--ab_permission_view - -0..N -{0,1} + +0..N +{0,1} - + ab_permission_view_role - -ab_permission_view_role - -id - - [INTEGER] - NOT NULL - -permission_view_id - - [INTEGER] - -role_id - - [INTEGER] + +ab_permission_view_role + +id + + [INTEGER] + NOT NULL + +permission_view_id + + [INTEGER] + +role_id + + [INTEGER] - + ab_permission_view--ab_permission_view_role - -0..N -{0,1} + +0..N +{0,1} - + ab_view_menu - -ab_view_menu - -id - - [INTEGER] - NOT NULL - -name - - [VARCHAR(250)] - NOT NULL + +ab_view_menu + +id + + [INTEGER] + NOT NULL + +name + + [VARCHAR(250)] + NOT NULL - + ab_view_menu--ab_permission_view - -0..N -{0,1} + +0..N +{0,1} - + ab_role - -ab_role - -id - - [INTEGER] - NOT NULL - -name - - [VARCHAR(64)] - NOT NULL + +ab_role + +id + + [INTEGER] + NOT NULL + +name + + [VARCHAR(64)] + NOT NULL - + ab_role--ab_user_role - -0..N -{0,1} + +0..N +{0,1} - + ab_role--ab_permission_view_role - -0..N -{0,1} + +0..N +{0,1} - + alembic_version_fab - -alembic_version_fab - -version_num - - [VARCHAR(32)] - NOT NULL + +alembic_version_fab + +version_num + + [VARCHAR(32)] + NOT NULL diff --git a/docs/apache-airflow/migrations-ref.rst b/docs/apache-airflow/migrations-ref.rst index 4d3cc11e12516..f3441ceaf72b1 100644 --- a/docs/apache-airflow/migrations-ref.rst +++ b/docs/apache-airflow/migrations-ref.rst @@ -39,7 +39,9 @@ Here's the list of all the Database Migrations that are executed via when you ru +-------------------------+------------------+-------------------+--------------------------------------------------------------+ | Revision ID | Revises ID | Airflow Version | Description | +=========================+==================+===================+==============================================================+ -| ``486ac7936b78`` (head) | ``d59cbbef95eb`` | ``3.0.0`` | remove scheduler_lock column. | +| ``5f57a45b8433`` (head) | ``486ac7936b78`` | ``3.0.0`` | Drop task_fail table. | ++-------------------------+------------------+-------------------+--------------------------------------------------------------+ +| ``486ac7936b78`` | ``d59cbbef95eb`` | ``3.0.0`` | remove scheduler_lock column. | +-------------------------+------------------+-------------------+--------------------------------------------------------------+ | ``d59cbbef95eb`` | ``05234396c6fc`` | ``3.0.0`` | Add UUID primary key to ``task_instance`` table. | +-------------------------+------------------+-------------------+--------------------------------------------------------------+ diff --git a/newsfragments/43490.significant.rst b/newsfragments/43490.significant.rst new file mode 100644 index 0000000000000..6d0a0af8ec48b --- /dev/null +++ b/newsfragments/43490.significant.rst @@ -0,0 +1,4 @@ +The ``task_fail`` table has been removed from the Airflow database. + +This table was used to store task failures, but it was not used by any Airflow components. +Use the REST API to get task failures instead (which gets it from the ``task_instance`` table) diff --git a/tests/models/test_dag.py b/tests/models/test_dag.py index afdfd9ab88892..2bca1c24acd4a 100644 --- a/tests/models/test_dag.py +++ b/tests/models/test_dag.py @@ -68,7 +68,6 @@ from airflow.models.dagrun import DagRun from airflow.models.param import DagParam, Param, ParamsDict from airflow.models.serialized_dag import SerializedDagModel -from airflow.models.taskfail import TaskFail from airflow.models.taskinstance import TaskInstance as TI from airflow.operators.empty import EmptyOperator from airflow.operators.python import PythonOperator @@ -160,7 +159,6 @@ def _clean_up(dag_id: str): with create_session() as session: session.query(DagRun).filter(DagRun.dag_id == dag_id).delete(synchronize_session=False) session.query(TI).filter(TI.dag_id == dag_id).delete(synchronize_session=False) - session.query(TaskFail).filter(TaskFail.dag_id == dag_id).delete(synchronize_session=False) @staticmethod def _occur_before(a, b, list_): diff --git a/tests/models/test_dagrun.py b/tests/models/test_dagrun.py index d788c56218c89..d0d78cff79d93 100644 --- a/tests/models/test_dagrun.py +++ b/tests/models/test_dagrun.py @@ -2524,12 +2524,11 @@ def print_value(value): def test_clearing_task_and_moving_from_non_mapped_to_mapped(dag_maker, session): """ Test that clearing a task and moving from non-mapped to mapped clears existing - references in XCom, TaskFail, TaskInstanceNote, TaskReschedule and + references in XCom, TaskInstanceNote, TaskReschedule and RenderedTaskInstanceFields. To be able to test this, RenderedTaskInstanceFields was not used in the test since it would require that the task is expanded first. """ - from airflow.models.taskfail import TaskFail from airflow.models.xcom import XCom @task @@ -2563,13 +2562,12 @@ def printx(x): # Purposely omitted RenderedTaskInstanceFields because the ti need # to be expanded but here we are mimicking and made it map_index -1 session.add(tr) - session.add(TaskFail(ti)) XCom.set(key="test", value="value", task_id=ti.task_id, dag_id=dag.dag_id, run_id=ti.run_id) session.commit() - for table in [TaskFail, TaskInstanceNote, TaskReschedule, XCom]: + for table in [TaskInstanceNote, TaskReschedule, XCom]: assert session.query(table).count() == 1 dr1.task_instance_scheduling_decisions(session) - for table in [TaskFail, TaskInstanceNote, TaskReschedule, XCom]: + for table in [TaskInstanceNote, TaskReschedule, XCom]: assert session.query(table).count() == 0 diff --git a/tests/models/test_taskinstance.py b/tests/models/test_taskinstance.py index fe4121fdffed9..20d8dc276a346 100644 --- a/tests/models/test_taskinstance.py +++ b/tests/models/test_taskinstance.py @@ -64,7 +64,6 @@ from airflow.models.pool import Pool from airflow.models.renderedtifields import RenderedTaskInstanceFields from airflow.models.serialized_dag import SerializedDagModel -from airflow.models.taskfail import TaskFail from airflow.models.taskinstance import ( TaskInstance, TaskInstance as TI, @@ -2228,13 +2227,6 @@ def test_email_alert(x): assert body.startswith("Try 0") # try number only incremented by the scheduler assert "test_email_alert" in body - tf = ( - session.query(TaskFail) - .filter_by(dag_id=ti.dag_id, task_id=ti.task_id, run_id=ti.run_id, map_index=ti.map_index) - .one_or_none() - ) - assert tf, "TaskFail was recorded" - def test_set_duration(self): task = EmptyOperator(task_id="op", email="test@test.test") ti = TI(task=task) @@ -4074,13 +4066,13 @@ def test_operator_field_with_serialization(self, create_task_instance): @pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode def test_clear_db_references(self, session, create_task_instance): - tables = [TaskFail, RenderedTaskInstanceFields, XCom] + tables = [RenderedTaskInstanceFields, XCom] ti = create_task_instance() ti.note = "sample note" session.merge(ti) session.commit() - for table in [TaskFail, RenderedTaskInstanceFields]: + for table in [RenderedTaskInstanceFields]: session.add(table(ti)) XCom.set(key="key", value="value", task_id=ti.task_id, dag_id=ti.dag_id, run_id=ti.run_id) session.commit()