Skip to content
56 changes: 56 additions & 0 deletions airflow/migrations/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,3 +58,59 @@ def disable_sqlite_fkeys(op):
op.execute("PRAGMA foreign_keys=on")
else:
yield op


def mysql_drop_foreignkey_if_exists(constraint_name: str, table_name: str, op) -> None:
"""Older Mysql versions do not support DROP FOREIGN KEY IF EXISTS."""
op.execute(f"""
CREATE PROCEDURE DropForeignKeyIfExists()
BEGIN
IF EXISTS (
SELECT 1
FROM information_schema.TABLE_CONSTRAINTS
WHERE
CONSTRAINT_SCHEMA = DATABASE() AND
TABLE_NAME = '{table_name}' AND
CONSTRAINT_NAME = '{constraint_name}' AND
CONSTRAINT_TYPE = 'FOREIGN KEY'
) THEN
ALTER TABLE {table_name}
DROP CONSTRAINT {constraint_name};
ELSE
SELECT 1;
END IF;
END;
CALL DropForeignKeyIfExists();
DROP PROCEDURE DropForeignKeyIfExists;
""")


def _drop_fkey_if_exists(table: str, constraint_name: str, op) -> None:
dialect = op.get_bind().dialect.name
if dialect == "sqlite":
try:
with op.batch_alter_table(table, schema=None) as batch_op:
batch_op.drop_constraint(op.f(constraint_name), type_="foreignkey")
except ValueError:
pass
elif dialect == "mysql":
mysql_drop_foreignkey_if_exists(constraint_name, table, op)
else:
op.execute(f"ALTER TABLE {table} DROP CONSTRAINT IF EXISTS {constraint_name}")


def _sqlite_guarded_drop_constraint(
*,
table: str,
key: str,
type_: str,
op,
) -> None:
conn = op.get_bind()
dialect_name = conn.dialect.name
try:
with op.batch_alter_table(table, schema=None) as batch_op:
batch_op.drop_constraint(key, type_=type_)
except ValueError:
if dialect_name != "sqlite":
raise
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
from alembic import op
from sqlalchemy import Column, Integer, inspect, text

from airflow.migrations.utils import _sqlite_guarded_drop_constraint

# revision identifiers, used by Alembic.
revision = "bbf4a7ad0465"
down_revision = "cf5dc11e79ad"
Expand Down Expand Up @@ -121,7 +123,7 @@ def downgrade():
conn = op.get_bind()
with op.batch_alter_table("xcom") as bop:
if conn.dialect.name != "mssql":
bop.drop_constraint("pk_xcom", type_="primary")
_sqlite_guarded_drop_constraint(table="xcom", key="pk_xcom", type_="primary", op=op)
bop.add_column(Column("id", Integer, nullable=False))
bop.create_primary_key("id", ["id"])
bop.create_index("idx_xcom_dag_task_date", ["dag_id", "task_id", "key", "execution_date"])
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
from alembic import op

from airflow.exceptions import AirflowException
from airflow.migrations.utils import _sqlite_guarded_drop_constraint
from airflow.models.base import COLLATION_ARGS

# revision identifiers, used by Alembic.
Expand All @@ -55,6 +56,6 @@ def upgrade():
def downgrade():
"""Unapply Add unique constraint to ``conn_id`` and set it as non-nullable."""
with op.batch_alter_table("connection") as batch_op:
batch_op.drop_constraint(constraint_name="unique_conn_id", type_="unique")
_sqlite_guarded_drop_constraint(table="connection", key="unique_conn_id", type_="unique", op=op)

batch_op.alter_column("conn_id", nullable=True, existing_type=sa.String(250))
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,8 @@ def remove_prefix_in_individual_dag_permissions(session):
.all()
)
for permission in perms:
permission.resource.name = permission.resource.name[len(prefix) :]
if permission.resource.name.startswith(prefix):
permission.resource.name = permission.resource.name[len(prefix) :]
session.commit()


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
from sqlalchemy.sql import and_, column, select, table

from airflow.migrations.db_types import TIMESTAMP, StringID
from airflow.migrations.utils import get_mssql_table_constraints
from airflow.migrations.utils import _sqlite_guarded_drop_constraint, get_mssql_table_constraints

ID_LEN = 250

Expand Down Expand Up @@ -221,12 +221,14 @@ def upgrade():

with op.batch_alter_table("task_instance", schema=None) as batch_op:
if dialect_name != "postgresql":
# TODO: Is this right for non-postgres?
if dialect_name == "mssql":
constraints = get_mssql_table_constraints(conn, "task_instance")
pk, _ = constraints["PRIMARY KEY"].popitem()
batch_op.drop_constraint(pk, type_="primary")
batch_op.drop_constraint("task_instance_pkey", type_="primary")
else:
_sqlite_guarded_drop_constraint(
table="task_instance", key="task_instance_pkey", type_="primary", op=op
)
batch_op.drop_index("ti_dag_date")
batch_op.drop_index("ti_state_lkp")
batch_op.drop_column("execution_date")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
from alembic import op
from sqlalchemy import CheckConstraint, Column, ForeignKeyConstraint, Integer, text

from airflow.migrations.utils import _sqlite_guarded_drop_constraint
from airflow.models.base import StringID
from airflow.utils.sqlalchemy import ExtendedJSON

Expand All @@ -52,7 +53,9 @@ def upgrade():
# Change task_instance's primary key.
with op.batch_alter_table("task_instance") as batch_op:
# I think we always use this name for TaskInstance after 7b2661a43ba3?
batch_op.drop_constraint("task_instance_pkey", type_="primary")
_sqlite_guarded_drop_constraint(
table="task_instance", key="task_instance_pkey", type_="primary", op=op
)
batch_op.add_column(Column("map_index", Integer, nullable=False, server_default=text("-1")))
batch_op.create_primary_key("task_instance_pkey", ["dag_id", "task_id", "run_id", "map_index"])

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
from alembic import op
from sqlalchemy import inspect

from airflow.migrations.utils import get_mssql_table_constraints
from airflow.migrations.utils import _drop_fkey_if_exists, get_mssql_table_constraints

# revision identifiers, used by Alembic.
revision = "3c94c427fdf6"
Expand All @@ -45,10 +45,10 @@ def upgrade():
if conn.dialect.name in ["sqlite", "mysql"]:
inspector = inspect(conn.engine)
foreignkey = inspector.get_foreign_keys("dag_tag")
_drop_fkey_if_exists("dag_tag", foreignkey[0]["name"], op)
with op.batch_alter_table(
"dag_tag",
) as batch_op:
batch_op.drop_constraint(foreignkey[0]["name"], type_="foreignkey")
batch_op.create_foreign_key(
"dag_tag_dag_id_fkey", "dag", ["dag_id"], ["dag_id"], ondelete="CASCADE"
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,12 @@ def upgrade():
""")
)

conn.execute(sa.text("INSERT INTO dag_run_new SELECT * FROM dag_run"))
headers = (
"id, dag_id, queued_at, execution_date, start_date, end_date, state, run_id, creating_job_id, "
"external_trigger, run_type, conf, data_interval_start, data_interval_end, "
"last_scheduling_decision, dag_hash, log_template_id, updated_at, clear_number"
)
conn.execute(sa.text(f"INSERT INTO dag_run_new ({headers}) SELECT {headers} FROM dag_run"))
conn.execute(sa.text("DROP TABLE dag_run"))
conn.execute(sa.text("ALTER TABLE dag_run_new RENAME TO dag_run"))
conn.execute(sa.text("PRAGMA foreign_keys=on"))
Expand Down
2 changes: 1 addition & 1 deletion docs/apache-airflow/img/airflow_erd.sha256
Original file line number Diff line number Diff line change
@@ -1 +1 @@
f240c081c0e69e9c213b887cab50dc1b61e4a71010fcc2f177ab9e22adef229f
c70c94b125dfd0a0e24d5815b6474e525e42a801d6b4915a3a97ca074d80b67e