Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion airflow-core/docs/img/airflow_erd.sha256
Original file line number Diff line number Diff line change
@@ -1 +1 @@
de526a7ff575b0f9deb174bf9e779bceb505d27a867bd2e8a67598de35f37c2f
7d6ced1b0a60a60c192ccc102b750c2d893d1c388625f8a90bb95ce457d0d9c4
3,028 changes: 1,514 additions & 1,514 deletions airflow-core/docs/img/airflow_erd.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
1 change: 0 additions & 1 deletion airflow-core/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,6 @@ dependencies = [
"setproctitle>=1.3.3",
# SQLAlchemy >=2.0.36 fixes Python 3.13 TypingOnly import AssertionError caused by new typing attributes (__static_attributes__, __firstlineno__)
"sqlalchemy[asyncio]>=2.0.36",
"sqlalchemy-jsonfield>=1.0",
"sqlalchemy-utils>=0.41.2",
"svcs>=25.1.0",
"tabulate>=0.9.0",
Expand Down
2 changes: 1 addition & 1 deletion airflow-core/src/airflow/assets/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ def register_asset_change(

event_kwargs = {
"asset_id": asset_model.id,
"extra": extra,
"extra": extra or {},
"partition_key": partition_key,
}
if task_instance:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,8 @@
from __future__ import annotations

import sqlalchemy as sa
import sqlalchemy_jsonfield
from alembic import op

from airflow.settings import json

# revision identifiers, used by Alembic.
revision = "ab34f260b71c"
down_revision = "d75389605139"
Expand All @@ -44,9 +41,7 @@
def upgrade():
"""Apply Add dataset_expression to DagModel."""
with op.batch_alter_table("dag") as batch_op:
batch_op.add_column(
sa.Column("dataset_expression", sqlalchemy_jsonfield.JSONField(json=json), nullable=True)
)
batch_op.add_column(sa.Column("dataset_expression", sa.JSON(), nullable=True))


def downgrade():
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
from __future__ import annotations

import sqlalchemy as sa
import sqlalchemy_jsonfield
from alembic import op

import airflow
Expand All @@ -49,7 +48,7 @@ def upgrade():
sa.Column("dag_id", sa.String(length=250), nullable=False),
sa.Column("from_date", airflow.utils.sqlalchemy.UtcDateTime(timezone=True), nullable=False),
sa.Column("to_date", airflow.utils.sqlalchemy.UtcDateTime(timezone=True), nullable=False),
sa.Column("dag_run_conf", sqlalchemy_jsonfield.jsonfield.JSONField(), nullable=False),
sa.Column("dag_run_conf", sa.JSON(), nullable=False),
sa.Column("is_paused", sa.Boolean(), nullable=True),
sa.Column("max_active_runs", sa.Integer(), nullable=False),
sa.Column("created_at", airflow.utils.sqlalchemy.UtcDateTime(timezone=True), nullable=False),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,10 @@
from typing import TYPE_CHECKING

import sqlalchemy as sa
import sqlalchemy_jsonfield
from alembic import op
from sqlalchemy import text

from airflow.migrations.utils import mysql_drop_foreignkey_if_exists
from airflow.settings import json

# revision identifiers, used by Alembic.
revision = "05234396c6fc"
Expand Down Expand Up @@ -454,7 +452,7 @@ def upgrade():
batch_op.alter_column(
"dataset_expression",
new_column_name="asset_expression",
type_=sqlalchemy_jsonfield.JSONField(json=json),
type_=sa.JSON(),
)


Expand Down Expand Up @@ -756,5 +754,5 @@ def downgrade():
batch_op.alter_column(
"asset_expression",
new_column_name="dataset_expression",
type_=sqlalchemy_jsonfield.JSONField(json=json),
type_=sa.JSON(),
)
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
from __future__ import annotations

import sqlalchemy as sa
import sqlalchemy_jsonfield
from alembic import op
from sqlalchemy_utils import UUIDType

Expand All @@ -51,7 +50,7 @@ def upgrade():
sa.Column("dagrun_id", sa.Integer(), nullable=True),
sa.Column("deadline", sa.DateTime(), nullable=False),
sa.Column("callback", sa.String(length=500), nullable=False),
sa.Column("callback_kwargs", sqlalchemy_jsonfield.jsonfield.JSONField(), nullable=True),
sa.Column("callback_kwargs", sa.JSON(), nullable=True),
sa.PrimaryKeyConstraint("id", name=op.f("deadline_pkey")),
sa.ForeignKeyConstraint(columns=("dagrun_id",), refcolumns=["dag_run.id"], ondelete="CASCADE"),
sa.ForeignKeyConstraint(columns=("dag_id",), refcolumns=["dag.dag_id"], ondelete="CASCADE"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,8 @@
from __future__ import annotations

import sqlalchemy as sa
import sqlalchemy_jsonfield
from alembic import op

from airflow.settings import json

revision = "dfee8bd5d574"
down_revision = "fe199e1abd77"
branch_labels = None
Expand All @@ -42,7 +39,7 @@
def upgrade():
op.add_column(
"dag",
sa.Column("deadline", sqlalchemy_jsonfield.JSONField(json=json), nullable=True),
sa.Column("deadline", sa.JSON(), nullable=True),
)


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,12 @@

from __future__ import annotations

import sqlalchemy_jsonfield
import sqlalchemy as sa
from alembic import op
from sqlalchemy import Boolean, Column, ForeignKeyConstraint, String, Text
from sqlalchemy.dialects import postgresql

from airflow._shared.timezones import timezone
from airflow.settings import json
from airflow.utils.sqlalchemy import UtcDateTime

# revision identifiers, used by Alembic.
Expand All @@ -54,18 +53,18 @@ def upgrade():
primary_key=True,
nullable=False,
),
Column("options", sqlalchemy_jsonfield.JSONField(json=json), nullable=False),
Column("options", sa.JSON(), nullable=False),
Column("subject", Text, nullable=False),
Column("body", Text, nullable=True),
Column("defaults", sqlalchemy_jsonfield.JSONField(json=json), nullable=True),
Column("defaults", sa.JSON(), nullable=True),
Column("multiple", Boolean, unique=False, default=False),
Column("params", sqlalchemy_jsonfield.JSONField(json=json), nullable=False, default={}),
Column("assignees", sqlalchemy_jsonfield.JSONField(json=json), nullable=True),
Column("params", sa.JSON(), nullable=False, default={}),
Column("assignees", sa.JSON(), nullable=True),
Column("created_at", UtcDateTime(timezone=True), nullable=False, default=timezone.utcnow),
Column("responded_at", UtcDateTime, nullable=True),
Column("responded_by", sqlalchemy_jsonfield.JSONField(json=json), nullable=True),
Column("chosen_options", sqlalchemy_jsonfield.JSONField(json=json), nullable=True),
Column("params_input", sqlalchemy_jsonfield.JSONField(json=json), nullable=False, default={}),
Column("responded_by", sa.JSON(), nullable=True),
Column("chosen_options", sa.JSON(), nullable=True),
Column("params_input", sa.JSON(), nullable=False, default={}),
ForeignKeyConstraint(
["ti_id"],
["task_instance.id"],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
from __future__ import annotations

import sqlalchemy as sa
import sqlalchemy_jsonfield
from alembic import op

# revision identifiers, used by Alembic.
Expand All @@ -44,14 +43,12 @@ def upgrade():
with op.batch_alter_table("deadline", schema=None) as batch_op:
batch_op.drop_column("callback")
batch_op.drop_column("callback_kwargs")
batch_op.add_column(sa.Column("callback", sqlalchemy_jsonfield.jsonfield.JSONField(), nullable=False))
batch_op.add_column(sa.Column("callback", sa.JSON(), nullable=False))


def downgrade():
"""Replace deadline table's JSON callback with string callback and JSON callback_kwargs."""
with op.batch_alter_table("deadline", schema=None) as batch_op:
batch_op.drop_column("callback")
batch_op.add_column(
sa.Column("callback_kwargs", sqlalchemy_jsonfield.jsonfield.JSONField(), nullable=True)
)
batch_op.add_column(sa.Column("callback_kwargs", sa.JSON(), nullable=True))
batch_op.add_column(sa.Column("callback", sa.String(length=500), nullable=False))
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,12 @@

from __future__ import annotations

import sqlalchemy_jsonfield
import sqlalchemy as sa
from alembic import op
from sqlalchemy import Boolean, Column, ForeignKeyConstraint, String, Text
from sqlalchemy.dialects import postgresql

from airflow._shared.timezones import timezone
from airflow.settings import json
from airflow.utils.sqlalchemy import UtcDateTime

# revision identifiers, used by Alembic.
Expand All @@ -54,18 +53,18 @@ def upgrade():
primary_key=True,
nullable=False,
),
Column("options", sqlalchemy_jsonfield.JSONField(json=json), nullable=False),
Column("options", sa.JSON(), nullable=False),
Column("subject", Text, nullable=False),
Column("body", Text, nullable=True),
Column("defaults", sqlalchemy_jsonfield.JSONField(json=json), nullable=True),
Column("defaults", sa.JSON(), nullable=True),
Column("multiple", Boolean, unique=False, default=False),
Column("params", sqlalchemy_jsonfield.JSONField(json=json), nullable=False, default={}),
Column("assignees", sqlalchemy_jsonfield.JSONField(json=json), nullable=True),
Column("params", sa.JSON(), nullable=False, default={}),
Column("assignees", sa.JSON(), nullable=True),
Column("created_at", UtcDateTime(timezone=True), nullable=False, default=timezone.utcnow),
Column("responded_at", UtcDateTime, nullable=True),
Column("responded_by", sqlalchemy_jsonfield.JSONField(json=json), nullable=True),
Column("chosen_options", sqlalchemy_jsonfield.JSONField(json=json), nullable=True),
Column("params_input", sqlalchemy_jsonfield.JSONField(json=json), nullable=False, default={}),
Column("responded_by", sa.JSON(), nullable=True),
Column("chosen_options", sa.JSON(), nullable=True),
Column("params_input", sa.JSON(), nullable=False, default={}),
ForeignKeyConstraint(
["ti_history_id"],
["task_instance_history.task_instance_id"],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import sqlalchemy as sa
from alembic import context, op
from sqlalchemy import column, select, table
from sqlalchemy_jsonfield import JSONField
from sqlalchemy_utils import UUIDType

from airflow.serialization.serde import deserialize
Expand Down Expand Up @@ -129,7 +128,7 @@ def migrate_all_data():
column("id", UUIDType(binary=False)),
column("dagrun_id", sa.Integer()),
column("deadline_time", UtcDateTime(timezone=True)),
column("callback", JSONField()),
column("callback", sa.JSON()),
column("callback_state", sa.String(20)),
column("missed", sa.Boolean()),
column("callback_id", UUIDType(binary=False)),
Expand Down Expand Up @@ -277,7 +276,7 @@ def migrate_all_data():
"deadline",
column("id", UUIDType(binary=False)),
column("callback_id", UUIDType(binary=False)),
column("callback", JSONField()),
column("callback", sa.JSON()),
column("callback_state", sa.String(20)),
column("trigger_id", sa.Integer()),
)
Expand Down Expand Up @@ -317,7 +316,7 @@ def migrate_all_data():
batch_op.add_column(sa.Column("trigger_id", sa.INTEGER(), autoincrement=False, nullable=True))

# Temporarily nullable until data has been migrated
batch_op.add_column(sa.Column("callback", JSONField(), nullable=True))
batch_op.add_column(sa.Column("callback", sa.JSON(), nullable=True))

# Make callback_id nullable so the associated callbacks can be cleared during migration
batch_op.alter_column("callback_id", existing_type=UUIDType(binary=False), nullable=True)
Expand All @@ -326,7 +325,7 @@ def migrate_all_data():

with op.batch_alter_table("deadline") as batch_op:
# Data for `callback` has been migrated so make it non-nullable
batch_op.alter_column("callback", existing_type=JSONField(), nullable=False)
batch_op.alter_column("callback", existing_type=sa.JSON(), nullable=False)

batch_op.drop_constraint(batch_op.f("deadline_callback_id_fkey"), type_="foreignkey")
batch_op.create_foreign_key(batch_op.f("deadline_trigger_id_fkey"), "trigger", ["trigger_id"], ["id"])
Expand Down
7 changes: 3 additions & 4 deletions airflow-core/src/airflow/models/asset.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
from typing import TYPE_CHECKING
from urllib.parse import urlsplit

import sqlalchemy_jsonfield
import sqlalchemy as sa
from sqlalchemy import (
Column,
ForeignKey,
Expand All @@ -39,7 +39,6 @@

from airflow._shared.timezones import timezone
from airflow.models.base import Base, StringID
from airflow.settings import json
from airflow.utils.sqlalchemy import UtcDateTime

if TYPE_CHECKING:
Expand Down Expand Up @@ -315,7 +314,7 @@ class AssetModel(Base):
default=str,
nullable=False,
)
extra: Mapped[dict] = mapped_column(sqlalchemy_jsonfield.JSONField(json=json), nullable=False, default={})
extra: Mapped[dict] = mapped_column(sa.JSON(), nullable=False, default={})

created_at: Mapped[datetime] = mapped_column(UtcDateTime, default=timezone.utcnow, nullable=False)
updated_at: Mapped[datetime] = mapped_column(
Expand Down Expand Up @@ -797,7 +796,7 @@ class AssetEvent(Base):

id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True)
asset_id: Mapped[int] = mapped_column(Integer, nullable=False)
extra: Mapped[dict] = mapped_column(sqlalchemy_jsonfield.JSONField(json=json), nullable=False, default={})
extra: Mapped[dict] = mapped_column(sa.JSON(), nullable=False, default={})
source_task_id: Mapped[str | None] = mapped_column(StringID(), nullable=True)
source_dag_id: Mapped[str | None] = mapped_column(StringID(), nullable=True)
source_run_id: Mapped[str | None] = mapped_column(StringID(), nullable=True)
Expand Down
5 changes: 2 additions & 3 deletions airflow-core/src/airflow/models/backfill.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
from enum import Enum
from typing import TYPE_CHECKING

import sqlalchemy as sa
from sqlalchemy import (
Boolean,
ForeignKeyConstraint,
Expand All @@ -40,12 +41,10 @@
)
from sqlalchemy.exc import IntegrityError
from sqlalchemy.orm import Mapped, mapped_column, relationship, validates
from sqlalchemy_jsonfield import JSONField

from airflow._shared.timezones import timezone
from airflow.exceptions import AirflowException, DagNotFound
from airflow.models.base import Base, StringID
from airflow.settings import json
from airflow.utils.session import create_session
from airflow.utils.sqlalchemy import UtcDateTime, nulls_first, with_row_locks
from airflow.utils.state import DagRunState
Expand Down Expand Up @@ -133,7 +132,7 @@ class Backfill(Base):
dag_id: Mapped[str] = mapped_column(StringID(), nullable=False)
from_date: Mapped[datetime] = mapped_column(UtcDateTime, nullable=False)
to_date: Mapped[datetime] = mapped_column(UtcDateTime, nullable=False)
dag_run_conf: Mapped[dict] = mapped_column(JSONField(json=json), nullable=False, default={})
dag_run_conf: Mapped[dict] = mapped_column(sa.JSON(), nullable=False, default={})
is_paused: Mapped[bool | None] = mapped_column(Boolean, default=False, nullable=True)
"""
Controls whether new dag runs will be created for this backfill.
Expand Down
11 changes: 3 additions & 8 deletions airflow-core/src/airflow/models/dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
from typing import TYPE_CHECKING, Any, cast

import pendulum
import sqlalchemy_jsonfield
import sqlalchemy as sa
import structlog
from dateutil.relativedelta import relativedelta
from sqlalchemy import (
Expand Down Expand Up @@ -66,7 +66,6 @@
from airflow.serialization.definitions.assets import SerializedAssetUniqueKey
from airflow.serialization.encoders import DAT, encode_deadline_alert
from airflow.serialization.enums import Encoding
from airflow.settings import json
from airflow.timetables.base import DataInterval, Timetable
from airflow.timetables.interval import CronDataIntervalTimetable, DeltaDataIntervalTimetable
from airflow.timetables.simple import AssetTriggeredTimetable, NullTimetable, OnceTimetable
Expand Down Expand Up @@ -386,13 +385,9 @@ class DagModel(Base):
# Timetable Type
timetable_type: Mapped[str] = mapped_column(String(255), nullable=False, default="")
# Asset expression based on asset triggers
asset_expression: Mapped[dict[str, Any] | None] = mapped_column(
sqlalchemy_jsonfield.JSONField(json=json), nullable=True
)
asset_expression: Mapped[dict[str, Any] | None] = mapped_column(sa.JSON(), nullable=True)
# DAG deadline information
_deadline: Mapped[dict[str, Any] | None] = mapped_column(
"deadline", sqlalchemy_jsonfield.JSONField(json=json), nullable=True
)
_deadline: Mapped[dict[str, Any] | None] = mapped_column("deadline", sa.JSON(), nullable=True)
# Tags for view filter
tags = relationship("DagTag", cascade="all, delete, delete-orphan", backref=backref("dag"))
# Dag owner links for DAGs view
Expand Down
Loading
Loading