Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion airflow-core/docs/img/airflow_erd.sha256
Original file line number Diff line number Diff line change
@@ -1 +1 @@
d2e81695973bf8b6b30e1f4543627547330ef531e50be633cf589fbdf639b0e8
a71e5dbeae4b36b9a53cee21f8ca99f6465f8cfc84130e2891c8213f91f6a58e
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,15 @@

from __future__ import annotations

from textwrap import dedent

import sqlalchemy as sa
from alembic import op
from alembic import context, op
from sqlalchemy_utils import UUIDType

from airflow.dag_processing.bundles.manager import DagBundlesManager
from airflow.models import DagBag

# revision identifiers, used by Alembic.
revision = "5d3072c51bac"
down_revision = "ffdb0566c7c0"
Expand All @@ -39,9 +44,31 @@
airflow_version = "3.1.0"


def _reserialize_dags():
manager = DagBundlesManager()
manager.sync_bundles_to_db()
bundles = manager.get_all_dag_bundles()
for bundle in bundles:
bundle.initialize()
dag_bag = DagBag(bundle.path, bundle_path=bundle.path, include_examples=False)
dag_bag.sync_to_db(bundle.name, bundle_version=bundle.get_current_version())


def upgrade():
"""Apply make dag_version_id non-nullable in TaskInstance."""
conn = op.get_bind()
if context.is_offline_mode():
print(
dedent("""
------------
-- WARNING: Unable to serialize the Dags in offline mode!
-- If any taskinstance row has a null dag_version_id when you do upgrade, the migration will fail.
------------
""")
)
else:
# Reserialize all DAGs to ensure we have dag_version
_reserialize_dags()
if conn.dialect.name == "postgresql":
update_query = sa.text("""
UPDATE task_instance
Expand Down