diff --git a/airflow-core/docs/img/airflow_erd.sha256 b/airflow-core/docs/img/airflow_erd.sha256 index bb0a75b55cd66..52b8da0f17f0d 100644 --- a/airflow-core/docs/img/airflow_erd.sha256 +++ b/airflow-core/docs/img/airflow_erd.sha256 @@ -1 +1 @@ -d2e81695973bf8b6b30e1f4543627547330ef531e50be633cf589fbdf639b0e8 \ No newline at end of file +a71e5dbeae4b36b9a53cee21f8ca99f6465f8cfc84130e2891c8213f91f6a58e \ No newline at end of file diff --git a/airflow-core/src/airflow/migrations/versions/0076_3_1_0_make_dag_version_id_non_nullable_in_.py b/airflow-core/src/airflow/migrations/versions/0076_3_1_0_make_dag_version_id_non_nullable_in_.py index cbd183f7b8f43..9a43b12134a41 100644 --- a/airflow-core/src/airflow/migrations/versions/0076_3_1_0_make_dag_version_id_non_nullable_in_.py +++ b/airflow-core/src/airflow/migrations/versions/0076_3_1_0_make_dag_version_id_non_nullable_in_.py @@ -27,10 +27,15 @@ from __future__ import annotations +from textwrap import dedent + import sqlalchemy as sa -from alembic import op +from alembic import context, op from sqlalchemy_utils import UUIDType +from airflow.dag_processing.bundles.manager import DagBundlesManager +from airflow.models import DagBag + # revision identifiers, used by Alembic. revision = "5d3072c51bac" down_revision = "ffdb0566c7c0" @@ -39,9 +44,31 @@ airflow_version = "3.1.0" +def _reserialize_dags(): + manager = DagBundlesManager() + manager.sync_bundles_to_db() + bundles = manager.get_all_dag_bundles() + for bundle in bundles: + bundle.initialize() + dag_bag = DagBag(bundle.path, bundle_path=bundle.path, include_examples=False) + dag_bag.sync_to_db(bundle.name, bundle_version=bundle.get_current_version()) + + def upgrade(): """Apply make dag_version_id non-nullable in TaskInstance.""" conn = op.get_bind() + if context.is_offline_mode(): + print( + dedent(""" + ------------ + -- WARNING: Unable to serialize the Dags in offline mode! + -- If any taskinstance row has a null dag_version_id when you do upgrade, the migration will fail. + ------------ + """) + ) + else: + # Reserialize all DAGs to ensure we have dag_version + _reserialize_dags() if conn.dialect.name == "postgresql": update_query = sa.text(""" UPDATE task_instance