Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion airflow-core/docs/img/airflow_erd.sha256
Original file line number Diff line number Diff line change
@@ -1 +1 @@
5e6b3557dfea1a0b4fc32156206212903ab2711ded591257121b70abf6f26765
93b4535c355e2b97770d31156a064a63b39d3efc570f7147696ed3ab048f1ef0
Original file line number Diff line number Diff line change
Expand Up @@ -143,12 +143,10 @@ def downgrade():
elif dialect_name == "mysql":
op.execute(
"""
UPDATE task_instance_history tih
JOIN (
SELECT id, ROW_NUMBER() OVER (ORDER BY id) AS row_num
FROM task_instance_history
) AS temp ON tih.id = temp.id
SET tih.id = temp.row_num;
SET @row_number = 0;
UPDATE task_instance_history
SET id = (@row_number := @row_number + 1)
ORDER BY try_id;
"""
)
else:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ def downgrade():
dialect_name = conn.dialect.name
with op.batch_alter_table("task_reschedule", schema=None) as batch_op:
batch_op.add_column(
sa.Column("try_number", sa.INTEGER(), autoincrement=False, nullable=False, default=1)
sa.Column("try_number", sa.INTEGER(), autoincrement=False, nullable=False, server_default="1")
)

with op.batch_alter_table("task_instance_note", schema=None) as batch_op:
Expand All @@ -117,5 +117,5 @@ def downgrade():
# (and on non sqlite batching isn't "a thing", it issue alter tables fine)
with op.batch_alter_table("task_instance_history", schema=None) as batch_op:
batch_op.add_column(
sa.Column("task_instance_id", UUIDType(binary=False), autoincrement=False, nullable=False)
sa.Column("task_instance_id", UUIDType(binary=False), autoincrement=False, nullable=True)
)
Original file line number Diff line number Diff line change
Expand Up @@ -99,11 +99,23 @@ def upgrade():

def downgrade():
"""Make bundle_name nullable."""
with op.batch_alter_table("dag", schema=None) as batch_op:
batch_op.drop_constraint(batch_op.f("dag_bundle_name_fkey"), type_="foreignkey")
import contextlib

batch_op.alter_column("bundle_name", nullable=True, existing_type=StringID())
with op.batch_alter_table("dag", schema=None) as batch_op:
batch_op.create_foreign_key(
batch_op.f("dag_bundle_name_fkey"), "dag_bundle", ["bundle_name"], ["name"]
)
dialect_name = op.get_bind().dialect.name
exitstack = contextlib.ExitStack()

if dialect_name == "sqlite":
# SQLite requires foreign key constraints to be disabled during batch operations
conn = op.get_bind()
conn.execute(text("PRAGMA foreign_keys=OFF"))
exitstack.callback(conn.execute, text("PRAGMA foreign_keys=ON"))

with exitstack:
with op.batch_alter_table("dag", schema=None) as batch_op:
batch_op.drop_constraint(batch_op.f("dag_bundle_name_fkey"), type_="foreignkey")
batch_op.alter_column("bundle_name", nullable=True, existing_type=StringID())

with op.batch_alter_table("dag", schema=None) as batch_op:
batch_op.create_foreign_key(
batch_op.f("dag_bundle_name_fkey"), "dag_bundle", ["bundle_name"], ["name"]
)
Loading