Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion airflow-core/docs/img/airflow_erd.sha256
Original file line number Diff line number Diff line change
@@ -1 +1 @@
4a44b71c7fe18f8f7c60ac1d576d29c618ee370f810fe1bb8a1894a3d77fcb0d
f5a99cb756403f20d63b7a4d0df67f776ac43d53d9bcc75323c4b80044d409e4
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,10 @@ def upgrade():
op.execute(pg_uuid7_fn)

# Migrate existing rows with UUID v7 using a timestamp-based generation
batch_num = 0
while True:
batch_num += 1
print(f"processing batch {batch_num}")
result = conn.execute(
text(
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
from __future__ import annotations

import sqlalchemy as sa
from alembic import op
from alembic import context, op
from sqlalchemy import text
from sqlalchemy.dialects.mysql import LONGBLOB

Expand Down Expand Up @@ -77,9 +77,24 @@ def upgrade():
condition = condition_templates.get(dialect)
if not condition:
raise RuntimeError(f"Unsupported dialect: {dialect}")

# Key is a reserved keyword in MySQL, so we need to quote it
quoted_key = conn.dialect.identifier_preparer.quote("key")
if dialect == "postgresql" and not context.is_offline_mode():
curr_timeout = (
int(
conn.execute(
text("""
SELECT setting
FROM pg_settings
WHERE name = 'statement_timeout'
""")
).scalar_one()
)
/ 1000
)
if curr_timeout > 0 and curr_timeout < 1800:
print("setting local statement timeout to 1800s")
conn.execute(text("SET LOCAL statement_timeout='1800s'"))

# Archive pickled data using the condition
conn.execute(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,17 @@
def upgrade():
"""Apply remove pickled data from dagrun table."""
conn = op.get_bind()
empty_vals = {
"mysql": "X'80057D942E'",
"postgresql": r"'\x80057D942E'",
"sqlite": "X'80057D942E'",
}
dialect = conn.dialect.name
try:
empty_val = empty_vals[dialect]
except KeyError:
raise RuntimeError(f"Dialect {dialect} not supported.")

conf_type = sa.JSON().with_variant(postgresql.JSONB, "postgresql")
op.add_column("dag_run", sa.Column("conf_json", conf_type, nullable=True))

Expand All @@ -61,12 +72,20 @@ def upgrade():
""")
)
else:
BATCH_SIZE = 100
BATCH_SIZE = 1000
offset = 0
while True:
err_count = 0
batch_num = offset + 1
print(f"converting dag run conf. batch={batch_num}")
rows = conn.execute(
text(
f"SELECT id,conf FROM dag_run WHERE conf IS not NULL order by id LIMIT {BATCH_SIZE} OFFSET {offset}"
"SELECT id, conf "
"FROM dag_run "
"WHERE conf IS not NULL "
f"AND conf != {empty_val}"
f"ORDER BY id LIMIT {BATCH_SIZE} "
f"OFFSET {offset}"
)
).fetchall()
if not rows:
Expand All @@ -85,9 +104,11 @@ def upgrade():
"""),
{"json_data": json_data, "id": row_id},
)
except Exception as e:
print(f"Error converting dagrun conf to json for dagrun ID {row_id}: {e}")
except Exception:
err_count += 1
continue
if err_count:
print(f"could not convert dag run conf for {err_count} records. batch={batch_num}")
offset += BATCH_SIZE

op.drop_column("dag_run", "conf")
Expand All @@ -112,12 +133,16 @@ def downgrade():
)

else:
BATCH_SIZE = 100
BATCH_SIZE = 1000
offset = 0
while True:
rows = conn.execute(
text(
f"SELECT id,conf FROM dag_run WHERE conf IS not NULL order by id LIMIT {BATCH_SIZE} OFFSET {offset}"
"SELECT id,conf "
"FROM dag_run "
"WHERE conf IS NOT NULL "
f"ORDER BY id LIMIT {BATCH_SIZE} "
f"OFFSET {offset}"
)
).fetchall()
if not rows:
Expand Down
Loading