Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion airflow-core/docs/img/airflow_erd.sha256
Original file line number Diff line number Diff line change
@@ -1 +1 @@
968bf5c974c4a9663b6be095837d255a2cc8e25ce80209904e672b36edd14148
47bc0fb61cdb68be06d02c7f7d88bf39c0d454bd23e9ad924e218999c40fc121
108 changes: 54 additions & 54 deletions airflow-core/docs/img/airflow_erd.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,10 @@ def upgrade():
op.execute(pg_uuid7_fn)

# Migrate existing rows with UUID v7 using a timestamp-based generation
batch_num = 0
while True:
batch_num += 1
print(f"processing batch {batch_num}")
result = conn.execute(
text(
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
from __future__ import annotations

import sqlalchemy as sa
from alembic import op
from alembic import context, op
from sqlalchemy import text
from sqlalchemy.dialects.mysql import LONGBLOB

Expand Down Expand Up @@ -77,9 +77,24 @@ def upgrade():
condition = condition_templates.get(dialect)
if not condition:
raise RuntimeError(f"Unsupported dialect: {dialect}")

# Key is a reserved keyword in MySQL, so we need to quote it
quoted_key = conn.dialect.identifier_preparer.quote("key")
if dialect == "postgresql" and not context.is_offline_mode():
curr_timeout = (
int(
conn.execute(
text("""
SELECT setting
FROM pg_settings
WHERE name = 'statement_timeout'
""")
).scalar_one()
)
/ 1000
)
if curr_timeout > 0 and curr_timeout < 1800:
print("setting local statement timeout to 1800s")
conn.execute(text("SET LOCAL statement_timeout='1800s'"))

# Archive pickled data using the condition
conn.execute(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,17 @@
def upgrade():
"""Apply remove pickled data from dagrun table."""
conn = op.get_bind()
empty_vals = {
"mysql": "X'80057D942E'",
"postgresql": r"'\x80057D942E'",
"sqlite": "X'80057D942E'",
}
dialect = conn.dialect.name
try:
empty_val = empty_vals[dialect]
except KeyError:
raise RuntimeError(f"Dialect {dialect} not supported.")

conf_type = sa.JSON().with_variant(postgresql.JSONB, "postgresql")
op.add_column("dag_run", sa.Column("conf_json", conf_type, nullable=True))

Expand All @@ -61,12 +72,20 @@ def upgrade():
""")
)
else:
BATCH_SIZE = 100
BATCH_SIZE = 1000
offset = 0
while True:
err_count = 0
batch_num = offset + 1
print(f"converting dag run conf. batch={batch_num}")
rows = conn.execute(
text(
f"SELECT id,conf FROM dag_run WHERE conf IS not NULL order by id LIMIT {BATCH_SIZE} OFFSET {offset}"
"SELECT id, conf "
"FROM dag_run "
"WHERE conf IS not NULL "
f"AND conf != {empty_val}"
f"ORDER BY id LIMIT {BATCH_SIZE} "
f"OFFSET {offset}"
)
).fetchall()
if not rows:
Expand All @@ -85,9 +104,11 @@ def upgrade():
"""),
{"json_data": json_data, "id": row_id},
)
except Exception as e:
print(f"Error converting dagrun conf to json for dagrun ID {row_id}: {e}")
except Exception:
err_count += 1
continue
if err_count:
print(f"could not convert dag run conf for {err_count} records. batch={batch_num}")
offset += BATCH_SIZE

op.drop_column("dag_run", "conf")
Expand All @@ -112,12 +133,16 @@ def downgrade():
)

else:
BATCH_SIZE = 100
BATCH_SIZE = 1000
offset = 0
while True:
rows = conn.execute(
text(
f"SELECT id,conf FROM dag_run WHERE conf IS not NULL order by id LIMIT {BATCH_SIZE} OFFSET {offset}"
"SELECT id,conf "
"FROM dag_run "
"WHERE conf IS NOT NULL "
f"ORDER BY id LIMIT {BATCH_SIZE} "
f"OFFSET {offset}"
)
).fetchall()
if not rows:
Expand Down