diff --git a/airflow/migrations/versions/0055_3_0_0_remove_pickled_data_from_dagrun_table.py b/airflow/migrations/versions/0055_3_0_0_remove_pickled_data_from_dagrun_table.py new file mode 100644 index 0000000000000..07d012ddf5719 --- /dev/null +++ b/airflow/migrations/versions/0055_3_0_0_remove_pickled_data_from_dagrun_table.py @@ -0,0 +1,145 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +""" +remove pickled data from dagrun table. + +Revision ID: e39a26ac59f6 +Revises: 38770795785f +Create Date: 2024-12-01 08:33:15.425141 + +""" + +from __future__ import annotations + +import json +import pickle +from textwrap import dedent + +import sqlalchemy as sa +from alembic import context, op +from sqlalchemy import text +from sqlalchemy.dialects import postgresql + +# revision identifiers, used by Alembic. +revision = "e39a26ac59f6" +down_revision = "38770795785f" +branch_labels = None +depends_on = None +airflow_version = "3.0.0" + + +def upgrade(): + """Apply remove pickled data from dagrun table.""" + conn = op.get_bind() + conf_type = sa.JSON().with_variant(postgresql.JSONB, "postgresql") + op.add_column("dag_run", sa.Column("conf_json", conf_type, nullable=True)) + + if context.is_offline_mode(): + print( + dedent(""" + ------------ + -- WARNING: Unable to migrate the data in the 'conf' column while in offline mode! + -- The 'conf' column will be set to NULL in offline mode. + -- Avoid using offline mode if you need to retain 'conf' values. + ------------ + """) + ) + else: + BATCH_SIZE = 100 + offset = 0 + while True: + rows = conn.execute( + text( + f"SELECT id,conf FROM dag_run WHERE conf IS not NULL order by id LIMIT {BATCH_SIZE} OFFSET {offset}" + ) + ).fetchall() + if not rows: + break + for row in rows: + row_id, pickle_data = row + + try: + original_data = pickle.loads(pickle_data) + json_data = json.dumps(original_data) + conn.execute( + text(""" + UPDATE dag_run + SET conf_json = :json_data + WHERE id = :id + """), + {"json_data": json_data, "id": row_id}, + ) + except Exception as e: + print(f"Error converting dagrun conf to json for dagrun ID {row_id}: {e}") + continue + offset += BATCH_SIZE + + op.drop_column("dag_run", "conf") + + op.alter_column("dag_run", "conf_json", existing_type=conf_type, new_column_name="conf") + + +def downgrade(): + """Unapply Remove pickled data from dagrun table.""" + conn = op.get_bind() + op.add_column("dag_run", sa.Column("conf_pickle", sa.PickleType(), nullable=True)) + + if context.is_offline_mode(): + print( + dedent(""" + ------------ + -- WARNING: Unable to migrate the data in the 'conf' column while in offline mode! + -- The 'conf' column will be set to NULL in offline mode. + -- Avoid using offline mode if you need to retain 'conf' values. + ------------ + """) + ) + + else: + BATCH_SIZE = 100 + offset = 0 + while True: + rows = conn.execute( + text( + f"SELECT id,conf FROM dag_run WHERE conf IS not NULL order by id LIMIT {BATCH_SIZE} OFFSET {offset}" + ) + ).fetchall() + if not rows: + break + for row in rows: + row_id, json_data = row + + try: + pickled_data = pickle.dumps(json_data, protocol=pickle.HIGHEST_PROTOCOL) + conn.execute( + text(""" + UPDATE dag_run + SET conf_pickle = :pickle_data + WHERE id = :id + """), + {"pickle_data": pickled_data, "id": row_id}, + ) + except Exception as e: + print(f"Error pickling dagrun conf for dagrun ID {row_id}: {e}") + continue + offset += BATCH_SIZE + + op.drop_column("dag_run", "conf") + + op.alter_column("dag_run", "conf_pickle", existing_type=sa.PickleType(), new_column_name="conf") diff --git a/airflow/models/dagrun.py b/airflow/models/dagrun.py index a5bef7e589cbe..15d275da1a465 100644 --- a/airflow/models/dagrun.py +++ b/airflow/models/dagrun.py @@ -25,6 +25,7 @@ import re2 from sqlalchemy import ( + JSON, Boolean, Column, Enum, @@ -32,7 +33,6 @@ ForeignKeyConstraint, Index, Integer, - PickleType, PrimaryKeyConstraint, String, Text, @@ -45,6 +45,7 @@ tuple_, update, ) +from sqlalchemy.dialects import postgresql from sqlalchemy.exc import IntegrityError from sqlalchemy.ext.associationproxy import association_proxy from sqlalchemy.orm import declared_attr, joinedload, relationship, synonym, validates @@ -138,7 +139,7 @@ class DagRun(Base, LoggingMixin): triggered_by = Column( Enum(DagRunTriggeredByType, native_enum=False, length=50) ) # Airflow component that triggered the run. - conf = Column(PickleType) + conf = Column(JSON().with_variant(postgresql.JSONB, "postgresql")) # These two must be either both NULL or both datetime. data_interval_start = Column(UtcDateTime) data_interval_end = Column(UtcDateTime) diff --git a/airflow/utils/db.py b/airflow/utils/db.py index 29d3dc5439cee..1a1eb6f4d3500 100644 --- a/airflow/utils/db.py +++ b/airflow/utils/db.py @@ -94,7 +94,7 @@ class MappedClassProtocol(Protocol): "2.9.2": "686269002441", "2.10.0": "22ed7efa9da2", "2.10.3": "5f2621c13b39", - "3.0.0": "38770795785f", + "3.0.0": "e39a26ac59f6", } diff --git a/docs/apache-airflow/img/airflow_erd.sha256 b/docs/apache-airflow/img/airflow_erd.sha256 index 48c765c1699bd..3616222fd880c 100644 --- a/docs/apache-airflow/img/airflow_erd.sha256 +++ b/docs/apache-airflow/img/airflow_erd.sha256 @@ -1 +1 @@ -f4ad824c8d9ff45e86002506edd83b540a88dab45bb292b1af96cd86dec5ecab \ No newline at end of file +ca59d711e6304f8bfdb25f49339d455602430dd6b880e420869fc892faef0596 \ No newline at end of file diff --git a/docs/apache-airflow/img/airflow_erd.svg b/docs/apache-airflow/img/airflow_erd.svg index b0f6d6b896667..24f75b3247093 100644 --- a/docs/apache-airflow/img/airflow_erd.svg +++ b/docs/apache-airflow/img/airflow_erd.svg @@ -1793,7 +1793,7 @@ conf - [BYTEA] + [JSONB] creating_job_id diff --git a/docs/apache-airflow/migrations-ref.rst b/docs/apache-airflow/migrations-ref.rst index 166dd0183a6a0..62013ff8f799c 100644 --- a/docs/apache-airflow/migrations-ref.rst +++ b/docs/apache-airflow/migrations-ref.rst @@ -39,7 +39,9 @@ Here's the list of all the Database Migrations that are executed via when you ru +-------------------------+------------------+-------------------+--------------------------------------------------------------+ | Revision ID | Revises ID | Airflow Version | Description | +=========================+==================+===================+==============================================================+ -| ``38770795785f`` (head) | ``5c9c0231baa2`` | ``3.0.0`` | Add asset reference models. | +| ``e39a26ac59f6`` (head) | ``38770795785f`` | ``3.0.0`` | remove pickled data from dagrun table. | ++-------------------------+------------------+-------------------+--------------------------------------------------------------+ +| ``38770795785f`` | ``5c9c0231baa2`` | ``3.0.0`` | Add asset reference models. | +-------------------------+------------------+-------------------+--------------------------------------------------------------+ | ``5c9c0231baa2`` | ``237cef8dfea1`` | ``3.0.0`` | Remove processor_subdir. | +-------------------------+------------------+-------------------+--------------------------------------------------------------+ diff --git a/newsfragments/44533.significant.rst b/newsfragments/44533.significant.rst new file mode 100644 index 0000000000000..55619c244f5ef --- /dev/null +++ b/newsfragments/44533.significant.rst @@ -0,0 +1,5 @@ +During offline migration, ``DagRun.conf`` is cleared + +.. Provide additional contextual information + +The ``conf`` column is changing from pickle to json, thus, the values in that column cannot be migrated during offline migrations. If you want to retain ``conf`` values for existing DagRuns, you must do a normal, non-offline, migration.