From 2dc7c0957811e462b21edd40ae0345a687777c30 Mon Sep 17 00:00:00 2001 From: Ephraim Anierobi Date: Wed, 20 Nov 2024 18:08:18 +0100 Subject: [PATCH 1/7] Fix ORM vs migration files inconsistencies There have been some inconsistences between ORM and migration files but it doesn't fail in tests. This is an attempt to fix the inconsistency and also have it fail in tests --- .../versions/0033_3_0_0_add_tables_for_backfill.py | 8 ++++++-- .../0036_3_0_0_add_name_field_to_dataset_model.py | 4 +--- .../0040_3_0_0_add_exception_reason_and_logical_date_.py | 2 +- airflow/models/__init__.py | 1 - airflow/models/asset.py | 2 +- docs/apache-airflow/img/airflow_erd.sha256 | 2 +- scripts/in_container/run_generate_migration.sh | 4 +++- tests_common/test_utils/db.py | 2 ++ 8 files changed, 15 insertions(+), 10 deletions(-) diff --git a/airflow/migrations/versions/0033_3_0_0_add_tables_for_backfill.py b/airflow/migrations/versions/0033_3_0_0_add_tables_for_backfill.py index bcbaa8b89a0ca..5ac391413e946 100644 --- a/airflow/migrations/versions/0033_3_0_0_add_tables_for_backfill.py +++ b/airflow/migrations/versions/0033_3_0_0_add_tables_for_backfill.py @@ -46,10 +46,10 @@ def upgrade(): op.create_table( "backfill", sa.Column("id", sa.Integer(), autoincrement=True, nullable=False), - sa.Column("dag_id", sa.String(length=250), nullable=True), + sa.Column("dag_id", sa.String(length=250), nullable=False), sa.Column("from_date", airflow.utils.sqlalchemy.UtcDateTime(timezone=True), nullable=False), sa.Column("to_date", airflow.utils.sqlalchemy.UtcDateTime(timezone=True), nullable=False), - sa.Column("dag_run_conf", sqlalchemy_jsonfield.jsonfield.JSONField(), nullable=True), + sa.Column("dag_run_conf", sqlalchemy_jsonfield.jsonfield.JSONField(), nullable=False), sa.Column("is_paused", sa.Boolean(), nullable=True), sa.Column("max_active_runs", sa.Integer(), nullable=False), sa.Column("created_at", airflow.utils.sqlalchemy.UtcDateTime(timezone=True), nullable=False), @@ -65,6 +65,10 @@ def upgrade(): sa.Column("sort_ordinal", sa.Integer(), nullable=False), sa.PrimaryKeyConstraint("id", name=op.f("backfill_dag_run_pkey")), sa.UniqueConstraint("backfill_id", "dag_run_id", name="ix_bdr_backfill_id_dag_run_id"), + sa.ForeignKeyConstraint( + ["backfill_id"], ["backfill.id"], name="bdr_backfill_fkey", ondelete="cascade" + ), + sa.ForeignKeyConstraint(["dag_run_id"], ["dag_run.id"], name="bdr_dag_run_fkey", ondelete="set null"), ) diff --git a/airflow/migrations/versions/0036_3_0_0_add_name_field_to_dataset_model.py b/airflow/migrations/versions/0036_3_0_0_add_name_field_to_dataset_model.py index 353dcbf0f8fdd..2676176692a81 100644 --- a/airflow/migrations/versions/0036_3_0_0_add_name_field_to_dataset_model.py +++ b/airflow/migrations/versions/0036_3_0_0_add_name_field_to_dataset_model.py @@ -61,9 +61,7 @@ def upgrade(): # Add 'name' column. Set it to nullable for now. with op.batch_alter_table("dataset", schema=None) as batch_op: batch_op.add_column(sa.Column("name", _STRING_COLUMN_TYPE)) - batch_op.add_column( - sa.Column("group", _STRING_COLUMN_TYPE, default=str, server_default="", nullable=False) - ) + batch_op.add_column(sa.Column("group", _STRING_COLUMN_TYPE, default="", nullable=False)) # Fill name from uri column. with Session(bind=op.get_bind()) as session: session.execute(sa.text("update dataset set name=uri")) diff --git a/airflow/migrations/versions/0040_3_0_0_add_exception_reason_and_logical_date_.py b/airflow/migrations/versions/0040_3_0_0_add_exception_reason_and_logical_date_.py index c4f96fb0ebf35..ab1a060af1d40 100644 --- a/airflow/migrations/versions/0040_3_0_0_add_exception_reason_and_logical_date_.py +++ b/airflow/migrations/versions/0040_3_0_0_add_exception_reason_and_logical_date_.py @@ -42,7 +42,7 @@ def upgrade(): """Apply Add exception_reason and logical_date to BackfillDagRun.""" with op.batch_alter_table("backfill", schema=None) as batch_op: - batch_op.add_column(sa.Column("reprocess_behavior", sa.String(length=250), nullable=True)) + batch_op.add_column(sa.Column("reprocess_behavior", sa.String(length=250), nullable=False)) with op.batch_alter_table("backfill_dag_run", schema=None) as batch_op: batch_op.add_column(sa.Column("exception_reason", sa.String(length=250), nullable=True)) batch_op.add_column(sa.Column("logical_date", UtcDateTime(timezone=True), nullable=False)) diff --git a/airflow/models/__init__.py b/airflow/models/__init__.py index 6d8803410532a..731c7d60da4a3 100644 --- a/airflow/models/__init__.py +++ b/airflow/models/__init__.py @@ -63,7 +63,6 @@ def import_all_models(): import airflow.models.serialized_dag import airflow.models.taskinstancehistory import airflow.models.tasklog - import airflow.providers.fab.auth_manager.models def __getattr__(name): diff --git a/airflow/models/asset.py b/airflow/models/asset.py index d47986a85e560..c3c61b2785c88 100644 --- a/airflow/models/asset.py +++ b/airflow/models/asset.py @@ -119,7 +119,7 @@ class AssetAliasModel(Base): ), "mysql", ), - default=str, + default="", nullable=False, ) diff --git a/docs/apache-airflow/img/airflow_erd.sha256 b/docs/apache-airflow/img/airflow_erd.sha256 index 242d0e4220410..c2c67e6872f58 100644 --- a/docs/apache-airflow/img/airflow_erd.sha256 +++ b/docs/apache-airflow/img/airflow_erd.sha256 @@ -1 +1 @@ -7748eec981f977cc97b852d1fe982aebe24ec2d090ae8493a65cea101f9d42a5 \ No newline at end of file +2f8a9fc0bb125d3103c89af0afd0c33c041055243dcd6bc3de9682e544e180a2 \ No newline at end of file diff --git a/scripts/in_container/run_generate_migration.sh b/scripts/in_container/run_generate_migration.sh index 50a6985513247..46560c9112272 100755 --- a/scripts/in_container/run_generate_migration.sh +++ b/scripts/in_container/run_generate_migration.sh @@ -20,5 +20,7 @@ cd "${AIRFLOW_SOURCES}" || exit 1 cd "airflow" || exit 1 -airflow db reset +airflow db reset -y +airflow db downgrade -n 2.10.3 -y +airflow db migrate -r heads alembic revision --autogenerate -m "${@}" diff --git a/tests_common/test_utils/db.py b/tests_common/test_utils/db.py index c9b62d096b936..34b7dc71e4b21 100644 --- a/tests_common/test_utils/db.py +++ b/tests_common/test_utils/db.py @@ -63,6 +63,8 @@ def initial_db_init(): from airflow.www.extensions.init_auth_manager import get_auth_manager db.resetdb() + db.downgrade(to_revision="044f740568ec") + db.upgradedb(to_revision="head") db.bootstrap_dagbag() # minimal app to add roles flask_app = Flask(__name__) From 51e5bf72def02965b0a9dcce7612ab54325c1197 Mon Sep 17 00:00:00 2001 From: Ephraim Anierobi Date: Thu, 21 Nov 2024 15:42:16 +0100 Subject: [PATCH 2/7] fix for mysql and postgres --- ..._log_table_and_change_event_name_length.py | 4 +- .../0028_3_0_0_drop_ab_user_id_foreign_key.py | 14 ++++++ .../0032_3_0_0_drop_execution_date_unique.py | 14 +++++- ..._add_uuid_primary_key_to_task_instance_.py | 46 +++++++++++-------- ...0_0_remove_pickled_data_from_xcom_table.py | 5 +- docs/apache-airflow/img/airflow_erd.sha256 | 2 +- tests_common/test_utils/db.py | 2 +- 7 files changed, 61 insertions(+), 26 deletions(-) diff --git a/airflow/migrations/versions/0010_2_9_0_add_run_id_to_audit_log_table_and_change_event_name_length.py b/airflow/migrations/versions/0010_2_9_0_add_run_id_to_audit_log_table_and_change_event_name_length.py index 44bef77578b65..22b9c4337811b 100644 --- a/airflow/migrations/versions/0010_2_9_0_add_run_id_to_audit_log_table_and_change_event_name_length.py +++ b/airflow/migrations/versions/0010_2_9_0_add_run_id_to_audit_log_table_and_change_event_name_length.py @@ -59,8 +59,8 @@ def downgrade(): if conn.dialect.name == "mssql": with op.batch_alter_table("log") as batch_op: batch_op.drop_index("idx_log_event") - batch_op.alter_column("event", type_=sa.String(30), nullable=False) + batch_op.alter_column("event", type_=sa.String(30)) batch_op.create_index("idx_log_event", ["event"]) else: with op.batch_alter_table("log") as batch_op: - batch_op.alter_column("event", type_=sa.String(30), nullable=False) + batch_op.alter_column("event", type_=sa.String(30)) diff --git a/airflow/migrations/versions/0028_3_0_0_drop_ab_user_id_foreign_key.py b/airflow/migrations/versions/0028_3_0_0_drop_ab_user_id_foreign_key.py index f88aaa014bb3a..8a9c77042e95f 100644 --- a/airflow/migrations/versions/0028_3_0_0_drop_ab_user_id_foreign_key.py +++ b/airflow/migrations/versions/0028_3_0_0_drop_ab_user_id_foreign_key.py @@ -45,6 +45,13 @@ def upgrade(): with op.batch_alter_table("task_instance_note", schema=None) as batch_op: batch_op.drop_constraint("task_instance_note_user_fkey", type_="foreignkey") + if op.get_bind().dialect.name == "mysql": + with op.batch_alter_table("dag_run_note", schema=None) as batch_op: + batch_op.drop_index("dag_run_note_user_fkey") + + with op.batch_alter_table("task_instance_note", schema=None) as batch_op: + batch_op.drop_index("task_instance_note_user_fkey") + def downgrade(): """Unapply Drop ab_user.id foreign key.""" @@ -53,3 +60,10 @@ def downgrade(): with op.batch_alter_table("dag_run_note", schema=None) as batch_op: batch_op.create_foreign_key("dag_run_note_user_fkey", "ab_user", ["user_id"], ["id"]) + + if op.get_bind().dialect.name == "mysql": + with op.batch_alter_table("task_instance_note", schema=None) as batch_op: + batch_op.create_index("task_instance_note_user_fkey", ["user_id"], unique=False) + + with op.batch_alter_table("dag_run_note", schema=None) as batch_op: + batch_op.create_index("dag_run_note_user_fkey", ["user_id"], unique=False) diff --git a/airflow/migrations/versions/0032_3_0_0_drop_execution_date_unique.py b/airflow/migrations/versions/0032_3_0_0_drop_execution_date_unique.py index b76bf209bd412..c782991881079 100644 --- a/airflow/migrations/versions/0032_3_0_0_drop_execution_date_unique.py +++ b/airflow/migrations/versions/0032_3_0_0_drop_execution_date_unique.py @@ -44,14 +44,24 @@ def upgrade(): with op.batch_alter_table("dag_run", schema=None) as batch_op: - batch_op.alter_column("execution_date", new_column_name="logical_date", existing_type=sa.TIMESTAMP) + batch_op.alter_column( + "execution_date", + new_column_name="logical_date", + existing_type=sa.TIMESTAMP, + existing_nullable=False, + ) with op.batch_alter_table("dag_run", schema=None) as batch_op: batch_op.drop_constraint("dag_run_dag_id_execution_date_key", type_="unique") def downgrade(): with op.batch_alter_table("dag_run", schema=None) as batch_op: - batch_op.alter_column("logical_date", new_column_name="execution_date", existing_type=sa.TIMESTAMP) + batch_op.alter_column( + "logical_date", + new_column_name="execution_date", + existing_type=sa.TIMESTAMP, + existing_nullable=False, + ) with op.batch_alter_table("dag_run", schema=None) as batch_op: batch_op.create_unique_constraint( "dag_run_dag_id_execution_date_key", diff --git a/airflow/migrations/versions/0042_3_0_0_add_uuid_primary_key_to_task_instance_.py b/airflow/migrations/versions/0042_3_0_0_add_uuid_primary_key_to_task_instance_.py index 2abd2116f989a..41cfddc9cef0b 100644 --- a/airflow/migrations/versions/0042_3_0_0_add_uuid_primary_key_to_task_instance_.py +++ b/airflow/migrations/versions/0042_3_0_0_add_uuid_primary_key_to_task_instance_.py @@ -167,6 +167,32 @@ def _get_type_id_column(dialect_name: str) -> sa.types.TypeEngine: return sa.String(36) +def create_foreign_keys(): + for fk in ti_fk_constraints: + if fk["table"] in ["task_instance_history", "task_map"]: + continue + with op.batch_alter_table(fk["table"]) as batch_op: + batch_op.create_foreign_key( + constraint_name=fk["fk"], + referent_table=ti_table, + local_cols=ti_fk_cols, + remote_cols=ti_fk_cols, + ondelete="CASCADE", + ) + for fk in ti_fk_constraints: + if fk["table"] not in ["task_instance_history", "task_map"]: + continue + with op.batch_alter_table(fk["table"]) as batch_op: + batch_op.create_foreign_key( + constraint_name=fk["fk"], + referent_table=ti_table, + local_cols=ti_fk_cols, + remote_cols=ti_fk_cols, + ondelete="CASCADE", + onupdate="CASCADE", + ) + + def upgrade(): """Add UUID primary key to task instance table.""" conn = op.get_bind() @@ -232,15 +258,7 @@ def upgrade(): batch_op.create_primary_key("task_instance_pkey", ["id"]) # Create foreign key constraints - for fk in ti_fk_constraints: - with op.batch_alter_table(fk["table"]) as batch_op: - batch_op.create_foreign_key( - constraint_name=fk["fk"], - referent_table=ti_table, - local_cols=ti_fk_cols, - remote_cols=ti_fk_cols, - ondelete="CASCADE", - ) + create_foreign_keys() def downgrade(): @@ -270,12 +288,4 @@ def downgrade(): batch_op.create_primary_key("task_instance_pkey", ti_fk_cols) # Re-add foreign key constraints - for fk in ti_fk_constraints: - with op.batch_alter_table(fk["table"]) as batch_op: - batch_op.create_foreign_key( - constraint_name=fk["fk"], - referent_table=ti_table, - local_cols=ti_fk_cols, - remote_cols=ti_fk_cols, - ondelete="CASCADE", - ) + create_foreign_keys() diff --git a/airflow/migrations/versions/0049_3_0_0_remove_pickled_data_from_xcom_table.py b/airflow/migrations/versions/0049_3_0_0_remove_pickled_data_from_xcom_table.py index 2b19827b6ae4c..ebe8e37a71ff0 100644 --- a/airflow/migrations/versions/0049_3_0_0_remove_pickled_data_from_xcom_table.py +++ b/airflow/migrations/versions/0049_3_0_0_remove_pickled_data_from_xcom_table.py @@ -101,9 +101,9 @@ def upgrade(): op.execute( """ ALTER TABLE xcom - ALTER COLUMN value TYPE JSONB + ALTER COLUMN value TYPE JSON USING CASE - WHEN value IS NOT NULL THEN CAST(CONVERT_FROM(value, 'UTF8') AS JSONB) + WHEN value IS NOT NULL THEN CAST(CONVERT_FROM(value, 'UTF8') AS JSON) ELSE NULL END """ @@ -136,6 +136,7 @@ def upgrade(): # Drop the old `value_old` column with op.batch_alter_table("xcom", schema=None) as batch_op: batch_op.drop_column("value_old") + op.drop_table("_xcom_archive") def downgrade(): diff --git a/docs/apache-airflow/img/airflow_erd.sha256 b/docs/apache-airflow/img/airflow_erd.sha256 index c2c67e6872f58..d27f5eda6ce03 100644 --- a/docs/apache-airflow/img/airflow_erd.sha256 +++ b/docs/apache-airflow/img/airflow_erd.sha256 @@ -1 +1 @@ -2f8a9fc0bb125d3103c89af0afd0c33c041055243dcd6bc3de9682e544e180a2 \ No newline at end of file +b12f6811bb7a340362e4b8774b3bb81db28a7d0258564b3431bd537368554cc3 \ No newline at end of file diff --git a/tests_common/test_utils/db.py b/tests_common/test_utils/db.py index 34b7dc71e4b21..53326cdf1af27 100644 --- a/tests_common/test_utils/db.py +++ b/tests_common/test_utils/db.py @@ -63,7 +63,7 @@ def initial_db_init(): from airflow.www.extensions.init_auth_manager import get_auth_manager db.resetdb() - db.downgrade(to_revision="044f740568ec") + db.downgrade(to_revision="5f2621c13b39") db.upgradedb(to_revision="head") db.bootstrap_dagbag() # minimal app to add roles From 8931a079ddd88bfbc4dea73998844be9b05c9305 Mon Sep 17 00:00:00 2001 From: Ephraim Anierobi Date: Fri, 22 Nov 2024 09:18:47 +0100 Subject: [PATCH 3/7] fixup! fix for mysql and postgres --- tests_common/test_utils/db.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/tests_common/test_utils/db.py b/tests_common/test_utils/db.py index 53326cdf1af27..e07e7a18cabe7 100644 --- a/tests_common/test_utils/db.py +++ b/tests_common/test_utils/db.py @@ -62,9 +62,12 @@ def initial_db_init(): from airflow.www.extensions.init_appbuilder import init_appbuilder from airflow.www.extensions.init_auth_manager import get_auth_manager + from tests_common.test_utils.compat import AIRFLOW_V_3_0_PLUS + db.resetdb() - db.downgrade(to_revision="5f2621c13b39") - db.upgradedb(to_revision="head") + if AIRFLOW_V_3_0_PLUS: + db.downgrade(to_revision="5f2621c13b39") + db.upgradedb(to_revision="head") db.bootstrap_dagbag() # minimal app to add roles flask_app = Flask(__name__) From cd71ac48c051f15994f802f40708cec4bf077011 Mon Sep 17 00:00:00 2001 From: Ephraim Anierobi Date: Fri, 22 Nov 2024 12:11:56 +0100 Subject: [PATCH 4/7] fix for sqlite --- .../0041_3_0_0_rename_dataset_as_asset.py | 18 +----- ..._0_0_create_fks_as_datasets_are_renamed.py | 55 +++++++++++++++++++ ...add_uuid_primary_key_to_task_instance_.py} | 4 +- ...044_3_0_0_remove_scheduler_lock_column.py} | 0 ...py => 0045_3_0_0__drop_task_fail_table.py} | 0 ...0_add_last_heartbeat_at_directly_to_ti.py} | 0 ...ing.py => 0047_3_0_0_drop_dag_pickling.py} | 0 ...ng.py => 0048_3_0_0_add_dag_versioning.py} | 1 + ...0049_3_0_0_add_trigger_asset_reference.py} | 0 ..._0_remove_pickled_data_from_xcom_table.py} | 0 docs/apache-airflow/img/airflow_erd.sha256 | 2 +- docs/apache-airflow/migrations-ref.rst | 4 +- 12 files changed, 64 insertions(+), 20 deletions(-) create mode 100644 airflow/migrations/versions/0042_3_0_0_create_fks_as_datasets_are_renamed.py rename airflow/migrations/versions/{0042_3_0_0_add_uuid_primary_key_to_task_instance_.py => 0043_3_0_0_add_uuid_primary_key_to_task_instance_.py} (99%) rename airflow/migrations/versions/{0043_3_0_0_remove_scheduler_lock_column.py => 0044_3_0_0_remove_scheduler_lock_column.py} (100%) rename airflow/migrations/versions/{0044_3_0_0__drop_task_fail_table.py => 0045_3_0_0__drop_task_fail_table.py} (100%) rename airflow/migrations/versions/{0045_3_0_0_add_last_heartbeat_at_directly_to_ti.py => 0046_3_0_0_add_last_heartbeat_at_directly_to_ti.py} (100%) rename airflow/migrations/versions/{0046_3_0_0_drop_dag_pickling.py => 0047_3_0_0_drop_dag_pickling.py} (100%) rename airflow/migrations/versions/{0047_3_0_0_add_dag_versioning.py => 0048_3_0_0_add_dag_versioning.py} (99%) rename airflow/migrations/versions/{0048_3_0_0_add_trigger_asset_reference.py => 0049_3_0_0_add_trigger_asset_reference.py} (100%) rename airflow/migrations/versions/{0049_3_0_0_remove_pickled_data_from_xcom_table.py => 0050_3_0_0_remove_pickled_data_from_xcom_table.py} (100%) diff --git a/airflow/migrations/versions/0041_3_0_0_rename_dataset_as_asset.py b/airflow/migrations/versions/0041_3_0_0_rename_dataset_as_asset.py index 03836503efe62..746ca46ec76f3 100644 --- a/airflow/migrations/versions/0041_3_0_0_rename_dataset_as_asset.py +++ b/airflow/migrations/versions/0041_3_0_0_rename_dataset_as_asset.py @@ -285,13 +285,6 @@ def upgrade(): unique=False, ) - batch_op.create_foreign_key( - constraint_name="toar_asset_fkey", - referent_table="asset", - local_cols=["asset_id"], - remote_cols=["id"], - ondelete="CASCADE", - ) batch_op.create_foreign_key( constraint_name="toar_dag_id_fkey", referent_table="dag", @@ -321,13 +314,6 @@ def upgrade(): unique=False, ) - batch_op.create_foreign_key( - constraint_name="adrq_asset_fkey", - referent_table="asset", - local_cols=["asset_id"], - remote_cols=["id"], - ondelete="CASCADE", - ) batch_op.create_foreign_key( constraint_name="adrq_dag_fkey", referent_table="dag", @@ -565,7 +551,7 @@ def downgrade(): with op.batch_alter_table("task_outlet_dataset_reference", schema=None) as batch_op: batch_op.alter_column("asset_id", new_column_name="dataset_id", type_=sa.Integer(), nullable=False) - batch_op.drop_constraint("toar_asset_fkey", type_="foreignkey") + # batch_op.drop_constraint("toar_asset_fkey", type_="foreignkey") batch_op.drop_constraint("toar_dag_id_fkey", type_="foreignkey") _rename_index( @@ -600,7 +586,7 @@ def downgrade(): with op.batch_alter_table("dataset_dag_run_queue", schema=None) as batch_op: batch_op.alter_column("asset_id", new_column_name="dataset_id", type_=sa.Integer(), nullable=False) - batch_op.drop_constraint("adrq_asset_fkey", type_="foreignkey") + # batch_op.drop_constraint("adrq_asset_fkey", type_="foreignkey") batch_op.drop_constraint("adrq_dag_fkey", type_="foreignkey") _rename_pk_constraint( diff --git a/airflow/migrations/versions/0042_3_0_0_create_fks_as_datasets_are_renamed.py b/airflow/migrations/versions/0042_3_0_0_create_fks_as_datasets_are_renamed.py new file mode 100644 index 0000000000000..c7729ed99c3ff --- /dev/null +++ b/airflow/migrations/versions/0042_3_0_0_create_fks_as_datasets_are_renamed.py @@ -0,0 +1,55 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +""" +create foreign key constraints for assets. + +Revision ID: c4a1639f0f67 +Revises: 05234396c6fc +Create Date: 2024-11-22 09:49:41.813016 + +""" + +from __future__ import annotations + +from alembic import op + +# revision identifiers, used by Alembic. +revision = "c4a1639f0f67" +down_revision = "05234396c6fc" +branch_labels = None +depends_on = None +airflow_version = "3.0.0" + + +def upgrade(): + """Apply create foreign key constraints for assets.""" + with op.batch_alter_table("asset_dag_run_queue", schema=None) as batch_op: + batch_op.create_foreign_key("adrq_asset_fkey", "asset", ["asset_id"], ["id"], ondelete="CASCADE") + + with op.batch_alter_table("task_outlet_asset_reference", schema=None) as batch_op: + batch_op.create_foreign_key("toar_asset_fkey", "asset", ["asset_id"], ["id"], ondelete="CASCADE") + + +def downgrade(): + """Unapply create foreign key constraints for assets.""" + with op.batch_alter_table("task_outlet_asset_reference", schema=None) as batch_op: + batch_op.drop_constraint("toar_asset_fkey", type_="foreignkey") + + with op.batch_alter_table("asset_dag_run_queue", schema=None) as batch_op: + batch_op.drop_constraint("adrq_asset_fkey", type_="foreignkey") diff --git a/airflow/migrations/versions/0042_3_0_0_add_uuid_primary_key_to_task_instance_.py b/airflow/migrations/versions/0043_3_0_0_add_uuid_primary_key_to_task_instance_.py similarity index 99% rename from airflow/migrations/versions/0042_3_0_0_add_uuid_primary_key_to_task_instance_.py rename to airflow/migrations/versions/0043_3_0_0_add_uuid_primary_key_to_task_instance_.py index 41cfddc9cef0b..75c008798a794 100644 --- a/airflow/migrations/versions/0042_3_0_0_add_uuid_primary_key_to_task_instance_.py +++ b/airflow/migrations/versions/0043_3_0_0_add_uuid_primary_key_to_task_instance_.py @@ -20,7 +20,7 @@ Add UUID primary key to ``task_instance`` table. Revision ID: d59cbbef95eb -Revises: 05234396c6fc +Revises: c4a1639f0f67 Create Date: 2024-10-21 22:39:12.394079 """ @@ -33,7 +33,7 @@ # revision identifiers, used by Alembic. revision = "d59cbbef95eb" -down_revision = "05234396c6fc" +down_revision = "c4a1639f0f67" branch_labels = "None" depends_on = None airflow_version = "3.0.0" diff --git a/airflow/migrations/versions/0043_3_0_0_remove_scheduler_lock_column.py b/airflow/migrations/versions/0044_3_0_0_remove_scheduler_lock_column.py similarity index 100% rename from airflow/migrations/versions/0043_3_0_0_remove_scheduler_lock_column.py rename to airflow/migrations/versions/0044_3_0_0_remove_scheduler_lock_column.py diff --git a/airflow/migrations/versions/0044_3_0_0__drop_task_fail_table.py b/airflow/migrations/versions/0045_3_0_0__drop_task_fail_table.py similarity index 100% rename from airflow/migrations/versions/0044_3_0_0__drop_task_fail_table.py rename to airflow/migrations/versions/0045_3_0_0__drop_task_fail_table.py diff --git a/airflow/migrations/versions/0045_3_0_0_add_last_heartbeat_at_directly_to_ti.py b/airflow/migrations/versions/0046_3_0_0_add_last_heartbeat_at_directly_to_ti.py similarity index 100% rename from airflow/migrations/versions/0045_3_0_0_add_last_heartbeat_at_directly_to_ti.py rename to airflow/migrations/versions/0046_3_0_0_add_last_heartbeat_at_directly_to_ti.py diff --git a/airflow/migrations/versions/0046_3_0_0_drop_dag_pickling.py b/airflow/migrations/versions/0047_3_0_0_drop_dag_pickling.py similarity index 100% rename from airflow/migrations/versions/0046_3_0_0_drop_dag_pickling.py rename to airflow/migrations/versions/0047_3_0_0_drop_dag_pickling.py diff --git a/airflow/migrations/versions/0047_3_0_0_add_dag_versioning.py b/airflow/migrations/versions/0048_3_0_0_add_dag_versioning.py similarity index 99% rename from airflow/migrations/versions/0047_3_0_0_add_dag_versioning.py rename to airflow/migrations/versions/0048_3_0_0_add_dag_versioning.py index e98bf5a200899..c76cad5fe82c1 100644 --- a/airflow/migrations/versions/0047_3_0_0_add_dag_versioning.py +++ b/airflow/migrations/versions/0048_3_0_0_add_dag_versioning.py @@ -131,6 +131,7 @@ def upgrade(): def downgrade(): """Unapply add dag versioning.""" + _delete_serdag_and_code() with op.batch_alter_table("task_instance_history", schema=None) as batch_op: batch_op.drop_column("dag_version_id") diff --git a/airflow/migrations/versions/0048_3_0_0_add_trigger_asset_reference.py b/airflow/migrations/versions/0049_3_0_0_add_trigger_asset_reference.py similarity index 100% rename from airflow/migrations/versions/0048_3_0_0_add_trigger_asset_reference.py rename to airflow/migrations/versions/0049_3_0_0_add_trigger_asset_reference.py diff --git a/airflow/migrations/versions/0049_3_0_0_remove_pickled_data_from_xcom_table.py b/airflow/migrations/versions/0050_3_0_0_remove_pickled_data_from_xcom_table.py similarity index 100% rename from airflow/migrations/versions/0049_3_0_0_remove_pickled_data_from_xcom_table.py rename to airflow/migrations/versions/0050_3_0_0_remove_pickled_data_from_xcom_table.py diff --git a/docs/apache-airflow/img/airflow_erd.sha256 b/docs/apache-airflow/img/airflow_erd.sha256 index d27f5eda6ce03..1fe2044c93a36 100644 --- a/docs/apache-airflow/img/airflow_erd.sha256 +++ b/docs/apache-airflow/img/airflow_erd.sha256 @@ -1 +1 @@ -b12f6811bb7a340362e4b8774b3bb81db28a7d0258564b3431bd537368554cc3 \ No newline at end of file +7173ce2b98bc53467274ebe89bd3d4c8e7672f7b40b3f22989139ece45741f8f \ No newline at end of file diff --git a/docs/apache-airflow/migrations-ref.rst b/docs/apache-airflow/migrations-ref.rst index 88a6079d6b6c5..582055210eb84 100644 --- a/docs/apache-airflow/migrations-ref.rst +++ b/docs/apache-airflow/migrations-ref.rst @@ -53,7 +53,9 @@ Here's the list of all the Database Migrations that are executed via when you ru +-------------------------+------------------+-------------------+--------------------------------------------------------------+ | ``486ac7936b78`` | ``d59cbbef95eb`` | ``3.0.0`` | remove scheduler_lock column. | +-------------------------+------------------+-------------------+--------------------------------------------------------------+ -| ``d59cbbef95eb`` | ``05234396c6fc`` | ``3.0.0`` | Add UUID primary key to ``task_instance`` table. | +| ``d59cbbef95eb`` | ``c4a1639f0f67`` | ``3.0.0`` | Add UUID primary key to ``task_instance`` table. | ++-------------------------+------------------+-------------------+--------------------------------------------------------------+ +| ``c4a1639f0f67`` | ``05234396c6fc`` | ``3.0.0`` | create foreign key constraints for assets. | +-------------------------+------------------+-------------------+--------------------------------------------------------------+ | ``05234396c6fc`` | ``3a8972ecb8f9`` | ``3.0.0`` | Rename dataset as asset. | +-------------------------+------------------+-------------------+--------------------------------------------------------------+ From 7dc124590f0faafd13dbcd3ee34ca8ef094c5a14 Mon Sep 17 00:00:00 2001 From: Ephraim Anierobi Date: Fri, 22 Nov 2024 13:37:02 +0100 Subject: [PATCH 5/7] fixup! fix for sqlite --- .../versions/0032_3_0_0_drop_execution_date_unique.py | 7 ++++--- docs/apache-airflow/img/airflow_erd.sha256 | 2 +- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/airflow/migrations/versions/0032_3_0_0_drop_execution_date_unique.py b/airflow/migrations/versions/0032_3_0_0_drop_execution_date_unique.py index c782991881079..a08682afca422 100644 --- a/airflow/migrations/versions/0032_3_0_0_drop_execution_date_unique.py +++ b/airflow/migrations/versions/0032_3_0_0_drop_execution_date_unique.py @@ -31,9 +31,10 @@ from __future__ import annotations -import sqlalchemy as sa from alembic import op +from airflow.utils.sqlalchemy import UtcDateTime + # revision identifiers, used by Alembic. revision = "1cdc775ca98f" down_revision = "a2c32e6c7729" @@ -47,7 +48,7 @@ def upgrade(): batch_op.alter_column( "execution_date", new_column_name="logical_date", - existing_type=sa.TIMESTAMP, + existing_type=UtcDateTime, existing_nullable=False, ) with op.batch_alter_table("dag_run", schema=None) as batch_op: @@ -59,7 +60,7 @@ def downgrade(): batch_op.alter_column( "logical_date", new_column_name="execution_date", - existing_type=sa.TIMESTAMP, + existing_type=UtcDateTime, existing_nullable=False, ) with op.batch_alter_table("dag_run", schema=None) as batch_op: diff --git a/docs/apache-airflow/img/airflow_erd.sha256 b/docs/apache-airflow/img/airflow_erd.sha256 index 1fe2044c93a36..62d88dfe5a18b 100644 --- a/docs/apache-airflow/img/airflow_erd.sha256 +++ b/docs/apache-airflow/img/airflow_erd.sha256 @@ -1 +1 @@ -7173ce2b98bc53467274ebe89bd3d4c8e7672f7b40b3f22989139ece45741f8f \ No newline at end of file +a81bc0639419e66abacc09d95177df593b10d8944708212ea2a96a51070b7b31 \ No newline at end of file From 47128e97fcc6614168c023a3f15db4c38a23eaf5 Mon Sep 17 00:00:00 2001 From: Ephraim Anierobi Date: Sun, 24 Nov 2024 09:40:36 +0100 Subject: [PATCH 6/7] fixup! fixup! fix for sqlite --- .../versions/0041_3_0_0_rename_dataset_as_asset.py | 2 -- .../migrations/versions/0048_3_0_0_add_dag_versioning.py | 1 - .../0050_3_0_0_remove_pickled_data_from_xcom_table.py | 7 ++++--- docs/apache-airflow/img/airflow_erd.sha256 | 2 +- tests/utils/test_db.py | 2 ++ 5 files changed, 7 insertions(+), 7 deletions(-) diff --git a/airflow/migrations/versions/0041_3_0_0_rename_dataset_as_asset.py b/airflow/migrations/versions/0041_3_0_0_rename_dataset_as_asset.py index 746ca46ec76f3..55dbcdb2c840e 100644 --- a/airflow/migrations/versions/0041_3_0_0_rename_dataset_as_asset.py +++ b/airflow/migrations/versions/0041_3_0_0_rename_dataset_as_asset.py @@ -551,7 +551,6 @@ def downgrade(): with op.batch_alter_table("task_outlet_dataset_reference", schema=None) as batch_op: batch_op.alter_column("asset_id", new_column_name="dataset_id", type_=sa.Integer(), nullable=False) - # batch_op.drop_constraint("toar_asset_fkey", type_="foreignkey") batch_op.drop_constraint("toar_dag_id_fkey", type_="foreignkey") _rename_index( @@ -586,7 +585,6 @@ def downgrade(): with op.batch_alter_table("dataset_dag_run_queue", schema=None) as batch_op: batch_op.alter_column("asset_id", new_column_name="dataset_id", type_=sa.Integer(), nullable=False) - # batch_op.drop_constraint("adrq_asset_fkey", type_="foreignkey") batch_op.drop_constraint("adrq_dag_fkey", type_="foreignkey") _rename_pk_constraint( diff --git a/airflow/migrations/versions/0048_3_0_0_add_dag_versioning.py b/airflow/migrations/versions/0048_3_0_0_add_dag_versioning.py index c76cad5fe82c1..e98bf5a200899 100644 --- a/airflow/migrations/versions/0048_3_0_0_add_dag_versioning.py +++ b/airflow/migrations/versions/0048_3_0_0_add_dag_versioning.py @@ -131,7 +131,6 @@ def upgrade(): def downgrade(): """Unapply add dag versioning.""" - _delete_serdag_and_code() with op.batch_alter_table("task_instance_history", schema=None) as batch_op: batch_op.drop_column("dag_version_id") diff --git a/airflow/migrations/versions/0050_3_0_0_remove_pickled_data_from_xcom_table.py b/airflow/migrations/versions/0050_3_0_0_remove_pickled_data_from_xcom_table.py index ebe8e37a71ff0..ef357824e9efb 100644 --- a/airflow/migrations/versions/0050_3_0_0_remove_pickled_data_from_xcom_table.py +++ b/airflow/migrations/versions/0050_3_0_0_remove_pickled_data_from_xcom_table.py @@ -101,9 +101,9 @@ def upgrade(): op.execute( """ ALTER TABLE xcom - ALTER COLUMN value TYPE JSON + ALTER COLUMN value TYPE JSONB USING CASE - WHEN value IS NOT NULL THEN CAST(CONVERT_FROM(value, 'UTF8') AS JSON) + WHEN value IS NOT NULL THEN CAST(CONVERT_FROM(value, 'UTF8') AS JSONB) ELSE NULL END """ @@ -136,7 +136,6 @@ def upgrade(): # Drop the old `value_old` column with op.batch_alter_table("xcom", schema=None) as batch_op: batch_op.drop_column("value_old") - op.drop_table("_xcom_archive") def downgrade(): @@ -181,3 +180,5 @@ def downgrade(): with op.batch_alter_table("xcom", schema=None) as batch_op: batch_op.drop_column("value_old") + + op.drop_table("_xcom_archive", if_exists=True) diff --git a/docs/apache-airflow/img/airflow_erd.sha256 b/docs/apache-airflow/img/airflow_erd.sha256 index 62d88dfe5a18b..9eeef16072c00 100644 --- a/docs/apache-airflow/img/airflow_erd.sha256 +++ b/docs/apache-airflow/img/airflow_erd.sha256 @@ -1 +1 @@ -a81bc0639419e66abacc09d95177df593b10d8944708212ea2a96a51070b7b31 \ No newline at end of file +65f26e091cabd5c67f45abb682d1ef4fe63cd0d701b5729e5cf95d9cf2599c6a \ No newline at end of file diff --git a/tests/utils/test_db.py b/tests/utils/test_db.py index 1d3412d361412..be6b394d1f8dc 100644 --- a/tests/utils/test_db.py +++ b/tests/utils/test_db.py @@ -96,6 +96,8 @@ def test_database_schema_and_sqlalchemy_model_are_in_sync(self): lambda t: (t[0] == "remove_table" and t[1].name == "sqlite_sequence"), # fab version table lambda t: (t[0] == "remove_table" and t[1].name == "alembic_version_fab"), + # Ignore _xcom_archive table + lambda t: (t[0] == "remove_table" and t[1].name == "_xcom_archive"), ] for ignore in ignores: From 43ee1af1fa1ac46945f865525081afb1d730a772 Mon Sep 17 00:00:00 2001 From: Ephraim Anierobi Date: Sun, 24 Nov 2024 21:21:09 +0100 Subject: [PATCH 7/7] use TIMESTAMP from db_types --- .../versions/0032_3_0_0_drop_execution_date_unique.py | 6 +++--- docs/apache-airflow/img/airflow_erd.sha256 | 2 +- docs/apache-airflow/img/airflow_erd.svg | 2 +- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/airflow/migrations/versions/0032_3_0_0_drop_execution_date_unique.py b/airflow/migrations/versions/0032_3_0_0_drop_execution_date_unique.py index a08682afca422..399cc8aff91f3 100644 --- a/airflow/migrations/versions/0032_3_0_0_drop_execution_date_unique.py +++ b/airflow/migrations/versions/0032_3_0_0_drop_execution_date_unique.py @@ -33,7 +33,7 @@ from alembic import op -from airflow.utils.sqlalchemy import UtcDateTime +from airflow.migrations.db_types import TIMESTAMP # revision identifiers, used by Alembic. revision = "1cdc775ca98f" @@ -48,7 +48,7 @@ def upgrade(): batch_op.alter_column( "execution_date", new_column_name="logical_date", - existing_type=UtcDateTime, + existing_type=TIMESTAMP(timezone=True), existing_nullable=False, ) with op.batch_alter_table("dag_run", schema=None) as batch_op: @@ -60,7 +60,7 @@ def downgrade(): batch_op.alter_column( "logical_date", new_column_name="execution_date", - existing_type=UtcDateTime, + existing_type=TIMESTAMP(timezone=True), existing_nullable=False, ) with op.batch_alter_table("dag_run", schema=None) as batch_op: diff --git a/docs/apache-airflow/img/airflow_erd.sha256 b/docs/apache-airflow/img/airflow_erd.sha256 index 9eeef16072c00..0886222b9e0e3 100644 --- a/docs/apache-airflow/img/airflow_erd.sha256 +++ b/docs/apache-airflow/img/airflow_erd.sha256 @@ -1 +1 @@ -65f26e091cabd5c67f45abb682d1ef4fe63cd0d701b5729e5cf95d9cf2599c6a \ No newline at end of file +9f404e900b4d166d170d878fc00cad8e4397a4288167eebef1d77281a881e92d \ No newline at end of file diff --git a/docs/apache-airflow/img/airflow_erd.svg b/docs/apache-airflow/img/airflow_erd.svg index 07796cce6c07c..cd1e5aac103be 100644 --- a/docs/apache-airflow/img/airflow_erd.svg +++ b/docs/apache-airflow/img/airflow_erd.svg @@ -1119,7 +1119,7 @@ value - [JSON] + [JSONB]