diff --git a/airflow-core/docs/migrations-ref.rst b/airflow-core/docs/migrations-ref.rst index 3b2080410e228..979886d18ea6c 100644 --- a/airflow-core/docs/migrations-ref.rst +++ b/airflow-core/docs/migrations-ref.rst @@ -39,7 +39,10 @@ Here's the list of all the Database Migrations that are executed via when you ru +-------------------------+------------------+-------------------+--------------------------------------------------------------+ | Revision ID | Revises ID | Airflow Version | Description | +=========================+==================+===================+==============================================================+ -| ``e79fc784f145`` (head) | ``0b112f49112d`` | ``3.2.0`` | add timetable_type to dag table for filtering. | +| ``62fb1d0a1252`` (head) | ``e79fc784f145`` | ``3.2.0`` | Add indexes on dag_version_id columns for db cleanup | +| | | | performance. | ++-------------------------+------------------+-------------------+--------------------------------------------------------------+ +| ``e79fc784f145`` | ``0b112f49112d`` | ``3.2.0`` | add timetable_type to dag table for filtering. | +-------------------------+------------------+-------------------+--------------------------------------------------------------+ | ``0b112f49112d`` | ``c47f2e1ab9d4`` | ``3.2.0`` | Add exceeds max runs flag to dag model. | +-------------------------+------------------+-------------------+--------------------------------------------------------------+ diff --git a/airflow-core/src/airflow/migrations/versions/0099_3_2_0_add_dag_version_id_indexes_for_db_cleanup.py b/airflow-core/src/airflow/migrations/versions/0099_3_2_0_add_dag_version_id_indexes_for_db_cleanup.py new file mode 100644 index 0000000000000..2cdf3bbaea4a2 --- /dev/null +++ b/airflow-core/src/airflow/migrations/versions/0099_3_2_0_add_dag_version_id_indexes_for_db_cleanup.py @@ -0,0 +1,75 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +""" +Add indexes on dag_version_id columns for db cleanup performance. + +When running `airflow db clean -t dag_version`, the cleanup process needs to +check for foreign key references in task_instance and dag_run tables. Without +indexes on these columns, this results in full table scans that can take several +minutes per batch on tables with hundreds of thousands of rows. + +This migration adds the missing indexes to speed up the cleanup operation. + +See: https://github.com/apache/airflow/issues/60145 + +Revision ID: 62fb1d0a1252 +Revises: e79fc784f145 +Create Date: 2025-01-09 12:00:00.000000 + +""" + +from __future__ import annotations + +from alembic import op + +revision = "62fb1d0a1252" +down_revision = "e79fc784f145" +branch_labels = None +depends_on = None +airflow_version = "3.2.0" + + +def upgrade(): + """Add indexes on dag_version_id columns to speed up db cleanup.""" + # task_instance.dag_version_id is used when cleaning dag_version records. + # The FK constraint alone doesn't create an index on the referencing side. + with op.batch_alter_table("task_instance", schema=None) as batch_op: + batch_op.create_index( + "idx_task_instance_dag_version_id", + ["dag_version_id"], + unique=False, + ) + + # dag_run.created_dag_version_id is also checked during dag_version cleanup. + # Same situation - FK exists but no index on the column. + with op.batch_alter_table("dag_run", schema=None) as batch_op: + batch_op.create_index( + "idx_dag_run_created_dag_version_id", + ["created_dag_version_id"], + unique=False, + ) + + +def downgrade(): + """Remove dag_version_id indexes.""" + with op.batch_alter_table("dag_run", schema=None) as batch_op: + batch_op.drop_index("idx_dag_run_created_dag_version_id") + + with op.batch_alter_table("task_instance", schema=None) as batch_op: + batch_op.drop_index("idx_task_instance_dag_version_id") diff --git a/airflow-core/src/airflow/models/dagrun.py b/airflow-core/src/airflow/models/dagrun.py index bd3973b77683d..a0b3a11f45442 100644 --- a/airflow-core/src/airflow/models/dagrun.py +++ b/airflow-core/src/airflow/models/dagrun.py @@ -237,6 +237,7 @@ class DagRun(Base, LoggingMixin): UniqueConstraint("dag_id", "logical_date", name="dag_run_dag_id_logical_date_key"), Index("idx_dag_run_dag_id", dag_id), Index("idx_dag_run_run_after", run_after), + Index("idx_dag_run_created_dag_version_id", created_dag_version_id), Index( "idx_dag_run_running_dags", "state", diff --git a/airflow-core/src/airflow/models/taskinstance.py b/airflow-core/src/airflow/models/taskinstance.py index 7c0aa0dff6197..b1481198c4a25 100644 --- a/airflow-core/src/airflow/models/taskinstance.py +++ b/airflow-core/src/airflow/models/taskinstance.py @@ -451,6 +451,7 @@ class TaskInstance(Base, LoggingMixin): Index("ti_pool", pool, state, priority_weight), Index("ti_trigger_id", trigger_id), Index("ti_heartbeat", last_heartbeat_at), + Index("idx_task_instance_dag_version_id", dag_version_id), PrimaryKeyConstraint("id", name="task_instance_pkey"), UniqueConstraint("dag_id", "task_id", "run_id", "map_index", name="task_instance_composite_key"), ForeignKeyConstraint( diff --git a/airflow-core/src/airflow/utils/db.py b/airflow-core/src/airflow/utils/db.py index d444689e26b56..8b6ce4b0d8982 100644 --- a/airflow-core/src/airflow/utils/db.py +++ b/airflow-core/src/airflow/utils/db.py @@ -112,7 +112,7 @@ class MappedClassProtocol(Protocol): "3.0.0": "29ce7909c52b", "3.0.3": "fe199e1abd77", "3.1.0": "cc92b33c6709", - "3.2.0": "e79fc784f145", + "3.2.0": "62fb1d0a1252", } # Prefix used to identify tables holding data moved during migration.