diff --git a/airflow/migrations/utils.py b/airflow/migrations/utils.py index bc31c8f70c5ed..1bef41813be40 100644 --- a/airflow/migrations/utils.py +++ b/airflow/migrations/utils.py @@ -58,3 +58,59 @@ def disable_sqlite_fkeys(op): op.execute("PRAGMA foreign_keys=on") else: yield op + + +def mysql_drop_foreignkey_if_exists(constraint_name: str, table_name: str, op) -> None: + """Older Mysql versions do not support DROP FOREIGN KEY IF EXISTS.""" + op.execute(f""" + CREATE PROCEDURE DropForeignKeyIfExists() + BEGIN + IF EXISTS ( + SELECT 1 + FROM information_schema.TABLE_CONSTRAINTS + WHERE + CONSTRAINT_SCHEMA = DATABASE() AND + TABLE_NAME = '{table_name}' AND + CONSTRAINT_NAME = '{constraint_name}' AND + CONSTRAINT_TYPE = 'FOREIGN KEY' + ) THEN + ALTER TABLE {table_name} + DROP CONSTRAINT {constraint_name}; + ELSE + SELECT 1; + END IF; + END; + CALL DropForeignKeyIfExists(); + DROP PROCEDURE DropForeignKeyIfExists; + """) + + +def _drop_fkey_if_exists(table: str, constraint_name: str, op) -> None: + dialect = op.get_bind().dialect.name + if dialect == "sqlite": + try: + with op.batch_alter_table(table, schema=None) as batch_op: + batch_op.drop_constraint(op.f(constraint_name), type_="foreignkey") + except ValueError: + pass + elif dialect == "mysql": + mysql_drop_foreignkey_if_exists(constraint_name, table, op) + else: + op.execute(f"ALTER TABLE {table} DROP CONSTRAINT IF EXISTS {constraint_name}") + + +def _sqlite_guarded_drop_constraint( + *, + table: str, + key: str, + type_: str, + op, +) -> None: + conn = op.get_bind() + dialect_name = conn.dialect.name + try: + with op.batch_alter_table(table, schema=None) as batch_op: + batch_op.drop_constraint(key, type_=type_) + except ValueError: + if dialect_name != "sqlite": + raise diff --git a/airflow/migrations/versions/0060_2_0_0_remove_id_column_from_xcom.py b/airflow/migrations/versions/0060_2_0_0_remove_id_column_from_xcom.py index 11359e498430f..bdaca9ac05b57 100644 --- a/airflow/migrations/versions/0060_2_0_0_remove_id_column_from_xcom.py +++ b/airflow/migrations/versions/0060_2_0_0_remove_id_column_from_xcom.py @@ -31,6 +31,8 @@ from alembic import op from sqlalchemy import Column, Integer, inspect, text +from airflow.migrations.utils import _sqlite_guarded_drop_constraint + # revision identifiers, used by Alembic. revision = "bbf4a7ad0465" down_revision = "cf5dc11e79ad" @@ -121,7 +123,7 @@ def downgrade(): conn = op.get_bind() with op.batch_alter_table("xcom") as bop: if conn.dialect.name != "mssql": - bop.drop_constraint("pk_xcom", type_="primary") + _sqlite_guarded_drop_constraint(table="xcom", key="pk_xcom", type_="primary", op=op) bop.add_column(Column("id", Integer, nullable=False)) bop.create_primary_key("id", ["id"]) bop.create_index("idx_xcom_dag_task_date", ["dag_id", "task_id", "key", "execution_date"]) diff --git a/airflow/migrations/versions/0064_2_0_0_add_unique_constraint_to_conn_id.py b/airflow/migrations/versions/0064_2_0_0_add_unique_constraint_to_conn_id.py index b0c9cacbcebed..e0582e2192ec6 100644 --- a/airflow/migrations/versions/0064_2_0_0_add_unique_constraint_to_conn_id.py +++ b/airflow/migrations/versions/0064_2_0_0_add_unique_constraint_to_conn_id.py @@ -30,6 +30,7 @@ from alembic import op from airflow.exceptions import AirflowException +from airflow.migrations.utils import _sqlite_guarded_drop_constraint from airflow.models.base import COLLATION_ARGS # revision identifiers, used by Alembic. @@ -55,6 +56,6 @@ def upgrade(): def downgrade(): """Unapply Add unique constraint to ``conn_id`` and set it as non-nullable.""" with op.batch_alter_table("connection") as batch_op: - batch_op.drop_constraint(constraint_name="unique_conn_id", type_="unique") + _sqlite_guarded_drop_constraint(table="connection", key="unique_conn_id", type_="unique", op=op) batch_op.alter_column("conn_id", nullable=True, existing_type=sa.String(250)) diff --git a/airflow/migrations/versions/0073_2_0_0_prefix_dag_permissions.py b/airflow/migrations/versions/0073_2_0_0_prefix_dag_permissions.py index 4a0ff8f2dacf1..40aa5e4b0306e 100644 --- a/airflow/migrations/versions/0073_2_0_0_prefix_dag_permissions.py +++ b/airflow/migrations/versions/0073_2_0_0_prefix_dag_permissions.py @@ -70,7 +70,8 @@ def remove_prefix_in_individual_dag_permissions(session): .all() ) for permission in perms: - permission.resource.name = permission.resource.name[len(prefix) :] + if permission.resource.name.startswith(prefix): + permission.resource.name = permission.resource.name[len(prefix) :] session.commit() diff --git a/airflow/migrations/versions/0093_2_2_0_taskinstance_keyed_to_dagrun.py b/airflow/migrations/versions/0093_2_2_0_taskinstance_keyed_to_dagrun.py index 6788fe3010546..939e170500134 100644 --- a/airflow/migrations/versions/0093_2_2_0_taskinstance_keyed_to_dagrun.py +++ b/airflow/migrations/versions/0093_2_2_0_taskinstance_keyed_to_dagrun.py @@ -31,7 +31,7 @@ from sqlalchemy.sql import and_, column, select, table from airflow.migrations.db_types import TIMESTAMP, StringID -from airflow.migrations.utils import get_mssql_table_constraints +from airflow.migrations.utils import _sqlite_guarded_drop_constraint, get_mssql_table_constraints ID_LEN = 250 @@ -221,12 +221,14 @@ def upgrade(): with op.batch_alter_table("task_instance", schema=None) as batch_op: if dialect_name != "postgresql": - # TODO: Is this right for non-postgres? if dialect_name == "mssql": constraints = get_mssql_table_constraints(conn, "task_instance") pk, _ = constraints["PRIMARY KEY"].popitem() batch_op.drop_constraint(pk, type_="primary") - batch_op.drop_constraint("task_instance_pkey", type_="primary") + else: + _sqlite_guarded_drop_constraint( + table="task_instance", key="task_instance_pkey", type_="primary", op=op + ) batch_op.drop_index("ti_dag_date") batch_op.drop_index("ti_state_lkp") batch_op.drop_column("execution_date") diff --git a/airflow/migrations/versions/0100_2_3_0_add_taskmap_and_map_id_on_taskinstance.py b/airflow/migrations/versions/0100_2_3_0_add_taskmap_and_map_id_on_taskinstance.py index 8fc4c37162d90..a89e162dba72f 100644 --- a/airflow/migrations/versions/0100_2_3_0_add_taskmap_and_map_id_on_taskinstance.py +++ b/airflow/migrations/versions/0100_2_3_0_add_taskmap_and_map_id_on_taskinstance.py @@ -31,6 +31,7 @@ from alembic import op from sqlalchemy import CheckConstraint, Column, ForeignKeyConstraint, Integer, text +from airflow.migrations.utils import _sqlite_guarded_drop_constraint from airflow.models.base import StringID from airflow.utils.sqlalchemy import ExtendedJSON @@ -52,7 +53,9 @@ def upgrade(): # Change task_instance's primary key. with op.batch_alter_table("task_instance") as batch_op: # I think we always use this name for TaskInstance after 7b2661a43ba3? - batch_op.drop_constraint("task_instance_pkey", type_="primary") + _sqlite_guarded_drop_constraint( + table="task_instance", key="task_instance_pkey", type_="primary", op=op + ) batch_op.add_column(Column("map_index", Integer, nullable=False, server_default=text("-1"))) batch_op.create_primary_key("task_instance_pkey", ["dag_id", "task_id", "run_id", "map_index"]) diff --git a/airflow/migrations/versions/0110_2_3_2_add_cascade_to_dag_tag_foreignkey.py b/airflow/migrations/versions/0110_2_3_2_add_cascade_to_dag_tag_foreignkey.py index 5bb853a971a45..4d91865eaef58 100644 --- a/airflow/migrations/versions/0110_2_3_2_add_cascade_to_dag_tag_foreignkey.py +++ b/airflow/migrations/versions/0110_2_3_2_add_cascade_to_dag_tag_foreignkey.py @@ -29,7 +29,7 @@ from alembic import op from sqlalchemy import inspect -from airflow.migrations.utils import get_mssql_table_constraints +from airflow.migrations.utils import _drop_fkey_if_exists, get_mssql_table_constraints # revision identifiers, used by Alembic. revision = "3c94c427fdf6" @@ -45,10 +45,10 @@ def upgrade(): if conn.dialect.name in ["sqlite", "mysql"]: inspector = inspect(conn.engine) foreignkey = inspector.get_foreign_keys("dag_tag") + _drop_fkey_if_exists("dag_tag", foreignkey[0]["name"], op) with op.batch_alter_table( "dag_tag", ) as batch_op: - batch_op.drop_constraint(foreignkey[0]["name"], type_="foreignkey") batch_op.create_foreign_key( "dag_tag_dag_id_fkey", "dag", ["dag_id"], ["dag_id"], ondelete="CASCADE" ) diff --git a/airflow/migrations/versions/0142_2_9_2_fix_inconsistency_between_ORM_and_migration_files.py b/airflow/migrations/versions/0142_2_9_2_fix_inconsistency_between_ORM_and_migration_files.py index 0a62b550d40b9..8c1f4735fdb96 100644 --- a/airflow/migrations/versions/0142_2_9_2_fix_inconsistency_between_ORM_and_migration_files.py +++ b/airflow/migrations/versions/0142_2_9_2_fix_inconsistency_between_ORM_and_migration_files.py @@ -244,7 +244,12 @@ def upgrade(): """) ) - conn.execute(sa.text("INSERT INTO dag_run_new SELECT * FROM dag_run")) + headers = ( + "id, dag_id, queued_at, execution_date, start_date, end_date, state, run_id, creating_job_id, " + "external_trigger, run_type, conf, data_interval_start, data_interval_end, " + "last_scheduling_decision, dag_hash, log_template_id, updated_at, clear_number" + ) + conn.execute(sa.text(f"INSERT INTO dag_run_new ({headers}) SELECT {headers} FROM dag_run")) conn.execute(sa.text("DROP TABLE dag_run")) conn.execute(sa.text("ALTER TABLE dag_run_new RENAME TO dag_run")) conn.execute(sa.text("PRAGMA foreign_keys=on")) diff --git a/docs/apache-airflow/img/airflow_erd.sha256 b/docs/apache-airflow/img/airflow_erd.sha256 index 9ecc324f73ae6..ddee93beb4e24 100644 --- a/docs/apache-airflow/img/airflow_erd.sha256 +++ b/docs/apache-airflow/img/airflow_erd.sha256 @@ -1 +1 @@ -f240c081c0e69e9c213b887cab50dc1b61e4a71010fcc2f177ab9e22adef229f \ No newline at end of file +c70c94b125dfd0a0e24d5815b6474e525e42a801d6b4915a3a97ca074d80b67e \ No newline at end of file