diff --git a/airflow-core/docs/img/airflow_erd.sha256 b/airflow-core/docs/img/airflow_erd.sha256 index 9a2b4388439c0..a76243f9c41cb 100644 --- a/airflow-core/docs/img/airflow_erd.sha256 +++ b/airflow-core/docs/img/airflow_erd.sha256 @@ -1 +1 @@ -4a44b71c7fe18f8f7c60ac1d576d29c618ee370f810fe1bb8a1894a3d77fcb0d \ No newline at end of file +f5a99cb756403f20d63b7a4d0df67f776ac43d53d9bcc75323c4b80044d409e4 \ No newline at end of file diff --git a/airflow-core/src/airflow/migrations/versions/0042_3_0_0_add_uuid_primary_key_to_task_instance_.py b/airflow-core/src/airflow/migrations/versions/0042_3_0_0_add_uuid_primary_key_to_task_instance_.py index ccd0697ff63ab..df7965d38e934 100644 --- a/airflow-core/src/airflow/migrations/versions/0042_3_0_0_add_uuid_primary_key_to_task_instance_.py +++ b/airflow-core/src/airflow/migrations/versions/0042_3_0_0_add_uuid_primary_key_to_task_instance_.py @@ -206,7 +206,10 @@ def upgrade(): op.execute(pg_uuid7_fn) # Migrate existing rows with UUID v7 using a timestamp-based generation + batch_num = 0 while True: + batch_num += 1 + print(f"processing batch {batch_num}") result = conn.execute( text( """ diff --git a/airflow-core/src/airflow/migrations/versions/0049_3_0_0_remove_pickled_data_from_xcom_table.py b/airflow-core/src/airflow/migrations/versions/0049_3_0_0_remove_pickled_data_from_xcom_table.py index c3972edbd12ab..fed378378290a 100644 --- a/airflow-core/src/airflow/migrations/versions/0049_3_0_0_remove_pickled_data_from_xcom_table.py +++ b/airflow-core/src/airflow/migrations/versions/0049_3_0_0_remove_pickled_data_from_xcom_table.py @@ -28,7 +28,7 @@ from __future__ import annotations import sqlalchemy as sa -from alembic import op +from alembic import context, op from sqlalchemy import text from sqlalchemy.dialects.mysql import LONGBLOB @@ -77,9 +77,24 @@ def upgrade(): condition = condition_templates.get(dialect) if not condition: raise RuntimeError(f"Unsupported dialect: {dialect}") - # Key is a reserved keyword in MySQL, so we need to quote it quoted_key = conn.dialect.identifier_preparer.quote("key") + if dialect == "postgresql" and not context.is_offline_mode(): + curr_timeout = ( + int( + conn.execute( + text(""" + SELECT setting + FROM pg_settings + WHERE name = 'statement_timeout' + """) + ).scalar_one() + ) + / 1000 + ) + if curr_timeout > 0 and curr_timeout < 1800: + print("setting local statement timeout to 1800s") + conn.execute(text("SET LOCAL statement_timeout='1800s'")) # Archive pickled data using the condition conn.execute( diff --git a/airflow-core/src/airflow/migrations/versions/0055_3_0_0_remove_pickled_data_from_dagrun_table.py b/airflow-core/src/airflow/migrations/versions/0055_3_0_0_remove_pickled_data_from_dagrun_table.py index 07d012ddf5719..31b71e2bedc3b 100644 --- a/airflow-core/src/airflow/migrations/versions/0055_3_0_0_remove_pickled_data_from_dagrun_table.py +++ b/airflow-core/src/airflow/migrations/versions/0055_3_0_0_remove_pickled_data_from_dagrun_table.py @@ -47,6 +47,17 @@ def upgrade(): """Apply remove pickled data from dagrun table.""" conn = op.get_bind() + empty_vals = { + "mysql": "X'80057D942E'", + "postgresql": r"'\x80057D942E'", + "sqlite": "X'80057D942E'", + } + dialect = conn.dialect.name + try: + empty_val = empty_vals[dialect] + except KeyError: + raise RuntimeError(f"Dialect {dialect} not supported.") + conf_type = sa.JSON().with_variant(postgresql.JSONB, "postgresql") op.add_column("dag_run", sa.Column("conf_json", conf_type, nullable=True)) @@ -61,12 +72,20 @@ def upgrade(): """) ) else: - BATCH_SIZE = 100 + BATCH_SIZE = 1000 offset = 0 while True: + err_count = 0 + batch_num = offset + 1 + print(f"converting dag run conf. batch={batch_num}") rows = conn.execute( text( - f"SELECT id,conf FROM dag_run WHERE conf IS not NULL order by id LIMIT {BATCH_SIZE} OFFSET {offset}" + "SELECT id, conf " + "FROM dag_run " + "WHERE conf IS not NULL " + f"AND conf != {empty_val}" + f"ORDER BY id LIMIT {BATCH_SIZE} " + f"OFFSET {offset}" ) ).fetchall() if not rows: @@ -85,9 +104,11 @@ def upgrade(): """), {"json_data": json_data, "id": row_id}, ) - except Exception as e: - print(f"Error converting dagrun conf to json for dagrun ID {row_id}: {e}") + except Exception: + err_count += 1 continue + if err_count: + print(f"could not convert dag run conf for {err_count} records. batch={batch_num}") offset += BATCH_SIZE op.drop_column("dag_run", "conf") @@ -112,12 +133,16 @@ def downgrade(): ) else: - BATCH_SIZE = 100 + BATCH_SIZE = 1000 offset = 0 while True: rows = conn.execute( text( - f"SELECT id,conf FROM dag_run WHERE conf IS not NULL order by id LIMIT {BATCH_SIZE} OFFSET {offset}" + "SELECT id,conf " + "FROM dag_run " + "WHERE conf IS NOT NULL " + f"ORDER BY id LIMIT {BATCH_SIZE} " + f"OFFSET {offset}" ) ).fetchall() if not rows: