diff --git a/airflow-core/docs/img/airflow_erd.sha256 b/airflow-core/docs/img/airflow_erd.sha256 index 3b20b97be2ae3..71d6f932ddfe9 100644 --- a/airflow-core/docs/img/airflow_erd.sha256 +++ b/airflow-core/docs/img/airflow_erd.sha256 @@ -1 +1 @@ -5e6b3557dfea1a0b4fc32156206212903ab2711ded591257121b70abf6f26765 \ No newline at end of file +93b4535c355e2b97770d31156a064a63b39d3efc570f7147696ed3ab048f1ef0 \ No newline at end of file diff --git a/airflow-core/src/airflow/migrations/versions/0060_3_0_0_add_try_id_to_ti_and_tih.py b/airflow-core/src/airflow/migrations/versions/0060_3_0_0_add_try_id_to_ti_and_tih.py index 9263748f5fda2..82a80cc8903de 100644 --- a/airflow-core/src/airflow/migrations/versions/0060_3_0_0_add_try_id_to_ti_and_tih.py +++ b/airflow-core/src/airflow/migrations/versions/0060_3_0_0_add_try_id_to_ti_and_tih.py @@ -143,12 +143,10 @@ def downgrade(): elif dialect_name == "mysql": op.execute( """ - UPDATE task_instance_history tih - JOIN ( - SELECT id, ROW_NUMBER() OVER (ORDER BY id) AS row_num - FROM task_instance_history - ) AS temp ON tih.id = temp.id - SET tih.id = temp.row_num; + SET @row_number = 0; + UPDATE task_instance_history + SET id = (@row_number := @row_number + 1) + ORDER BY try_id; """ ) else: diff --git a/airflow-core/src/airflow/migrations/versions/0068_3_0_0_ti_table_id_unique_per_try.py b/airflow-core/src/airflow/migrations/versions/0068_3_0_0_ti_table_id_unique_per_try.py index 80a3e252745eb..da67f0e4f9ebf 100644 --- a/airflow-core/src/airflow/migrations/versions/0068_3_0_0_ti_table_id_unique_per_try.py +++ b/airflow-core/src/airflow/migrations/versions/0068_3_0_0_ti_table_id_unique_per_try.py @@ -96,7 +96,7 @@ def downgrade(): dialect_name = conn.dialect.name with op.batch_alter_table("task_reschedule", schema=None) as batch_op: batch_op.add_column( - sa.Column("try_number", sa.INTEGER(), autoincrement=False, nullable=False, default=1) + sa.Column("try_number", sa.INTEGER(), autoincrement=False, nullable=False, server_default="1") ) with op.batch_alter_table("task_instance_note", schema=None) as batch_op: @@ -117,5 +117,5 @@ def downgrade(): # (and on non sqlite batching isn't "a thing", it issue alter tables fine) with op.batch_alter_table("task_instance_history", schema=None) as batch_op: batch_op.add_column( - sa.Column("task_instance_id", UUIDType(binary=False), autoincrement=False, nullable=False) + sa.Column("task_instance_id", UUIDType(binary=False), autoincrement=False, nullable=True) ) diff --git a/airflow-core/src/airflow/migrations/versions/0082_3_1_0_make_bundle_name_not_nullable.py b/airflow-core/src/airflow/migrations/versions/0082_3_1_0_make_bundle_name_not_nullable.py index 08faeaaa5b055..37afc71b0b790 100644 --- a/airflow-core/src/airflow/migrations/versions/0082_3_1_0_make_bundle_name_not_nullable.py +++ b/airflow-core/src/airflow/migrations/versions/0082_3_1_0_make_bundle_name_not_nullable.py @@ -99,11 +99,23 @@ def upgrade(): def downgrade(): """Make bundle_name nullable.""" - with op.batch_alter_table("dag", schema=None) as batch_op: - batch_op.drop_constraint(batch_op.f("dag_bundle_name_fkey"), type_="foreignkey") + import contextlib - batch_op.alter_column("bundle_name", nullable=True, existing_type=StringID()) - with op.batch_alter_table("dag", schema=None) as batch_op: - batch_op.create_foreign_key( - batch_op.f("dag_bundle_name_fkey"), "dag_bundle", ["bundle_name"], ["name"] - ) + dialect_name = op.get_bind().dialect.name + exitstack = contextlib.ExitStack() + + if dialect_name == "sqlite": + # SQLite requires foreign key constraints to be disabled during batch operations + conn = op.get_bind() + conn.execute(text("PRAGMA foreign_keys=OFF")) + exitstack.callback(conn.execute, text("PRAGMA foreign_keys=ON")) + + with exitstack: + with op.batch_alter_table("dag", schema=None) as batch_op: + batch_op.drop_constraint(batch_op.f("dag_bundle_name_fkey"), type_="foreignkey") + batch_op.alter_column("bundle_name", nullable=True, existing_type=StringID()) + + with op.batch_alter_table("dag", schema=None) as batch_op: + batch_op.create_foreign_key( + batch_op.f("dag_bundle_name_fkey"), "dag_bundle", ["bundle_name"], ["name"] + )