Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AIP-65: Add DAG versioning support #42913

Merged
merged 49 commits into from
Nov 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
49 commits
Select commit Hold shift + click to select a range
a3275a7
AIP-65: Add DAG versioning support
ephraimbuddy Oct 10, 2024
ae5b6da
fixup! AIP-65: Add DAG versioning support
ephraimbuddy Oct 16, 2024
ed89ca6
fixup! fixup! AIP-65: Add DAG versioning support
ephraimbuddy Oct 16, 2024
ef506f9
fix migration
ephraimbuddy Oct 16, 2024
3821dbb
fix test
ephraimbuddy Oct 16, 2024
683bbd7
more test fixes
ephraimbuddy Oct 16, 2024
fe655de
update query count
ephraimbuddy Oct 16, 2024
ff9be22
fix static checks
ephraimbuddy Oct 17, 2024
7f50a3c
Fix query and add created_at to dag_version table
ephraimbuddy Oct 18, 2024
57bd887
improve code
ephraimbuddy Oct 20, 2024
e2bdad3
Change to using UUID for primary keys
ephraimbuddy Oct 20, 2024
a4e3c78
DagCode.bulk_write_code is no longer used
ephraimbuddy Oct 20, 2024
e994074
fixup! Change to using UUID for primary keys
ephraimbuddy Oct 20, 2024
e06b752
fix tests
ephraimbuddy Oct 21, 2024
0bef933
fixup! fix tests
ephraimbuddy Oct 21, 2024
d2bd271
use uuid for version_name
ephraimbuddy Oct 21, 2024
99abb36
fixup! use uuid for version_name
ephraimbuddy Oct 21, 2024
23204b8
use row lock when writing dag version
ephraimbuddy Oct 21, 2024
2525902
use row lock when writing dag version
ephraimbuddy Oct 21, 2024
610d3b1
fixup! use row lock when writing dag version
ephraimbuddy Oct 21, 2024
2cf03f5
deactivating dag should not remove serialized dags
ephraimbuddy Oct 22, 2024
ad91949
save version_name as string not uuid
ephraimbuddy Oct 22, 2024
11de3a2
Make dag_version_id unique
ephraimbuddy Oct 24, 2024
108c80c
fixup! Make dag_version_id unique
ephraimbuddy Oct 25, 2024
53a4837
Fix tests
ephraimbuddy Oct 25, 2024
554dd15
Use uuid7
ephraimbuddy Oct 25, 2024
72f2cc0
fix test
ephraimbuddy Oct 25, 2024
903ed3c
fixup! fix test
ephraimbuddy Oct 25, 2024
db44170
use binary=False for uuid field to fix sqlite issue
ephraimbuddy Oct 26, 2024
c4ed8af
apply suggestions from code review
ephraimbuddy Oct 29, 2024
4f011d2
Remove unnecessary version_name on dagmodel
ephraimbuddy Oct 29, 2024
4bf8b1f
Fix sqlalchemy 2 warning
ephraimbuddy Oct 29, 2024
0e0c127
Fix conflicts
ephraimbuddy Oct 30, 2024
e410f2c
Apply suggestions from code review
ephraimbuddy Oct 31, 2024
76982f3
fixup! Apply suggestions from code review
ephraimbuddy Oct 31, 2024
c4171cb
fixup! fixup! Apply suggestions from code review
ephraimbuddy Oct 31, 2024
dad0f36
add test for dagversion model and make version_name, number and dag_i…
ephraimbuddy Oct 31, 2024
13777e7
Remove commented test as serdag can no longer disappear
ephraimbuddy Oct 31, 2024
9f96cb0
Add SQLAlchemy-utils to requirements
ephraimbuddy Oct 31, 2024
b2c4844
mark test_dag_version.py as db_test
ephraimbuddy Oct 31, 2024
611a5cb
make version_name nullable
ephraimbuddy Oct 31, 2024
cc41fb6
Apply suggestions from code review
ephraimbuddy Nov 1, 2024
621a407
fixup! Apply suggestions from code review
ephraimbuddy Nov 1, 2024
108965b
remove file_updater
ephraimbuddy Nov 1, 2024
a51585e
Use dag_version for creating dagruns instead of dag_version_id
ephraimbuddy Nov 4, 2024
01f3c58
fix conflicts
ephraimbuddy Nov 4, 2024
11de1e9
use if TYPE_CHECKING
ephraimbuddy Nov 4, 2024
569eb53
Add docstrings to methods
ephraimbuddy Nov 5, 2024
a7bba02
Move getting latest serdags to SerializedDagModel
ephraimbuddy Nov 5, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions airflow/api/common/trigger_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
from airflow.api_internal.internal_api_call import internal_api_call
from airflow.exceptions import DagNotFound, DagRunAlreadyExists
from airflow.models import DagBag, DagModel, DagRun
from airflow.models.dag_version import DagVersion
from airflow.utils import timezone
from airflow.utils.session import NEW_SESSION, provide_session
from airflow.utils.state import DagRunState
Expand Down Expand Up @@ -92,14 +93,14 @@ def _trigger_dag(
run_conf = None
if conf:
run_conf = conf if isinstance(conf, dict) else json.loads(conf)

dag_version = DagVersion.get_latest_version(dag.dag_id)
dag_run = dag.create_dagrun(
run_id=run_id,
execution_date=execution_date,
state=DagRunState.QUEUED,
conf=run_conf,
external_trigger=True,
dag_hash=dag_bag.dags_hash.get(dag_id),
dag_version=dag_version,
data_interval=data_interval,
triggered_by=triggered_by,
)
Expand Down
5 changes: 3 additions & 2 deletions airflow/api_connexion/endpoints/dag_run_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
from airflow.auth.managers.models.resource_details import DagAccessEntity
from airflow.exceptions import ParamValidationError
from airflow.models import DagModel, DagRun
from airflow.models.dag_version import DagVersion
from airflow.timetables.base import DataInterval
from airflow.utils.airflow_flask_app import get_airflow_app
from airflow.utils.api_migration import mark_fastapi_migration_done
Expand Down Expand Up @@ -341,7 +342,7 @@ def post_dag_run(*, dag_id: str, session: Session = NEW_SESSION) -> APIResponse:
)
else:
data_interval = dag.timetable.infer_manual_data_interval(run_after=logical_date)

dag_version = DagVersion.get_latest_version(dag.dag_id)
dag_run = dag.create_dagrun(
run_type=DagRunType.MANUAL,
run_id=run_id,
Expand All @@ -350,7 +351,7 @@ def post_dag_run(*, dag_id: str, session: Session = NEW_SESSION) -> APIResponse:
state=DagRunState.QUEUED,
conf=post_body.get("conf"),
external_trigger=True,
dag_hash=get_airflow_app().dag_bag.dags_hash.get(dag_id),
ephraimbuddy marked this conversation as resolved.
Show resolved Hide resolved
dag_version=dag_version,
session=session,
triggered_by=DagRunTriggeredByType.REST_API,
)
Expand Down
15 changes: 0 additions & 15 deletions airflow/dag_processing/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@
from airflow.models.dagwarning import DagWarning
from airflow.models.db_callback_request import DbCallbackRequest
from airflow.models.errors import ParseImportError
from airflow.models.serialized_dag import SerializedDagModel
from airflow.secrets.cache import SecretCache
from airflow.stats import Stats
from airflow.traces.tracer import Trace, add_span
Expand Down Expand Up @@ -539,10 +538,6 @@ def deactivate_stale_dags(
if deactivated:
cls.logger().info("Deactivated %i DAGs which are no longer present in file.", deactivated)

for dag_id in to_deactivate:
SerializedDagModel.remove_dag(dag_id)
cls.logger().info("Deleted DAG %s in serialized_dag table", dag_id)

def _run_parsing_loop(self):
# In sync mode we want timeout=None -- wait forever until a message is received
if self._async_mode:
Expand Down Expand Up @@ -819,20 +814,10 @@ def _iter_dag_filelocs(fileloc: str) -> Iterator[str]:

dag_filelocs = {full_loc for path in self._file_paths for full_loc in _iter_dag_filelocs(path)}

from airflow.models.dagcode import DagCode

SerializedDagModel.remove_deleted_dags(
alive_dag_filelocs=dag_filelocs,
processor_subdir=self.get_dag_directory(),
)
DagModel.deactivate_deleted_dags(
dag_filelocs,
processor_subdir=self.get_dag_directory(),
)
DagCode.remove_deleted_code(
dag_filelocs,
processor_subdir=self.get_dag_directory(),
)

return True
return False
Expand Down
5 changes: 3 additions & 2 deletions airflow/example_dags/plugins/event_listener.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,9 +164,10 @@ def on_dag_run_running(dag_run: DagRun, msg: str):
"""
print("Dag run in running state")
queued_at = dag_run.queued_at
dag_hash_info = dag_run.dag_hash

print(f"Dag information Queued at: {queued_at} hash info: {dag_hash_info}")
version = dag_run.dag_version.version

print(f"Dag information Queued at: {queued_at} version: {version}")


# [END howto_listen_dagrun_running_task]
22 changes: 11 additions & 11 deletions airflow/jobs/scheduler_job_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,10 @@
)
from airflow.models.backfill import Backfill
from airflow.models.dag import DAG, DagModel
from airflow.models.dag_version import DagVersion
from airflow.models.dagbag import DagBag
from airflow.models.dagrun import DagRun
from airflow.models.dagwarning import DagWarning, DagWarningType
from airflow.models.serialized_dag import SerializedDagModel
from airflow.models.taskinstance import SimpleTaskInstance, TaskInstance
from airflow.stats import Stats
from airflow.ti_deps.dependencies_states import EXECUTION_STATES
Expand Down Expand Up @@ -1338,7 +1338,7 @@ def _create_dag_runs(self, dag_models: Collection[DagModel], session: Session) -
self.log.error("DAG '%s' not found in serialized_dag table", dag_model.dag_id)
continue

dag_hash = self.dagbag.dags_hash.get(dag.dag_id)
latest_dag_version = DagVersion.get_latest_version(dag.dag_id, session=session)

data_interval = dag.get_next_data_interval(dag_model)
# Explicitly check if the DagRun already exists. This is an edge case
Expand All @@ -1358,7 +1358,7 @@ def _create_dag_runs(self, dag_models: Collection[DagModel], session: Session) -
data_interval=data_interval,
external_trigger=False,
session=session,
dag_hash=dag_hash,
dag_version=latest_dag_version,
creating_job_id=self.job.id,
triggered_by=DagRunTriggeredByType.TIMETABLE,
)
Expand Down Expand Up @@ -1417,7 +1417,7 @@ def _create_dag_runs_asset_triggered(
)
continue

dag_hash = self.dagbag.dags_hash.get(dag.dag_id)
latest_dag_version = DagVersion.get_latest_version(dag.dag_id, session=session)

# Explicitly check if the DagRun already exists. This is an edge case
# where a Dag Run is created but `DagModel.next_dagrun` and `DagModel.next_dagrun_create_after`
Expand Down Expand Up @@ -1472,7 +1472,7 @@ def _create_dag_runs_asset_triggered(
state=DagRunState.QUEUED,
external_trigger=False,
session=session,
dag_hash=dag_hash,
dag_version=latest_dag_version,
creating_job_id=self.job.id,
triggered_by=DagRunTriggeredByType.ASSET,
)
Expand Down Expand Up @@ -1750,18 +1750,20 @@ def _verify_integrity_if_dag_changed(self, dag_run: DagRun, session: Session) ->
Return True if we determine that DAG still exists.
"""
latest_version = SerializedDagModel.get_latest_version_hash(dag_run.dag_id, session=session)
if dag_run.dag_hash == latest_version:
latest_dag_version = DagVersion.get_latest_version(dag_run.dag_id, session=session)
if TYPE_CHECKING:
assert latest_dag_version
if dag_run.dag_version_id == latest_dag_version.id:
self.log.debug("DAG %s not changed structure, skipping dagrun.verify_integrity", dag_run.dag_id)
return True

dag_run.dag_hash = latest_version

# Refresh the DAG
dag_run.dag = self.dagbag.get_dag(dag_id=dag_run.dag_id, session=session)
if not dag_run.dag:
return False

dag_run.dag_version = latest_dag_version

# Verify integrity also takes care of session.flush
dag_run.verify_integrity(session=session)
return True
Expand Down Expand Up @@ -2041,7 +2043,6 @@ def _cleanup_stale_dags(self, session: Session = NEW_SESSION) -> None:
In case one of DagProcessors is stopped (in case there are multiple of them
for different dag folders), its dags are never marked as inactive.
Also remove dags from SerializedDag table.
Executed on schedule only if [scheduler]standalone_dag_processor is True.
"""
self.log.debug("Checking dags not parsed within last %s seconds.", self._dag_stale_not_seen_duration)
Expand All @@ -2056,7 +2057,6 @@ def _cleanup_stale_dags(self, session: Session = NEW_SESSION) -> None:
self.log.info("Found (%d) stales dags not parsed after %s.", len(stale_dags), limit_lpt)
for dag in stale_dags:
dag.is_active = False
SerializedDagModel.remove_dag(dag_id=dag.dag_id, session=session)
session.flush()

@provide_session
Expand Down
151 changes: 151 additions & 0 deletions airflow/migrations/versions/0047_3_0_0_add_dag_versioning.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

"""
add dag versioning.
Revision ID: 2b47dc6bc8df
Revises: d03e4a635aa3
Create Date: 2024-10-09 05:44:04.670984
"""

from __future__ import annotations

import sqlalchemy as sa
from alembic import op
from sqlalchemy_utils import UUIDType

from airflow.migrations.db_types import StringID
from airflow.models.base import naming_convention
from airflow.utils import timezone
from airflow.utils.sqlalchemy import UtcDateTime

# revision identifiers, used by Alembic.
revision = "2b47dc6bc8df"
down_revision = "d03e4a635aa3"
branch_labels = None
depends_on = None
airflow_version = "3.0.0"


def upgrade():
"""Apply add dag versioning."""
op.create_table(
"dag_version",
sa.Column("id", UUIDType(binary=False), nullable=False),
sa.Column("version_number", sa.Integer(), nullable=False),
sa.Column("version_name", StringID()),
sa.Column("dag_id", StringID(), nullable=False),
sa.Column("created_at", UtcDateTime(), nullable=False, default=timezone.utcnow),
sa.ForeignKeyConstraint(
("dag_id",), ["dag.dag_id"], name=op.f("dag_version_dag_id_fkey"), ondelete="CASCADE"
),
sa.PrimaryKeyConstraint("id", name=op.f("dag_version_pkey")),
sa.UniqueConstraint("dag_id", "version_number", name="dag_id_v_name_v_number_unique_constraint"),
)
with op.batch_alter_table("dag_code", recreate="always", naming_convention=naming_convention) as batch_op:
batch_op.drop_constraint("dag_code_pkey", type_="primary")
batch_op.add_column(
sa.Column("id", UUIDType(binary=False), primary_key=True), insert_before="fileloc_hash"
)
batch_op.create_primary_key("dag_code_pkey", ["id"])
batch_op.add_column(sa.Column("dag_version_id", UUIDType(binary=False), nullable=False))
batch_op.create_foreign_key(
batch_op.f("dag_code_dag_version_id_fkey"),
"dag_version",
["dag_version_id"],
["id"],
ondelete="CASCADE",
)
batch_op.create_unique_constraint("dag_code_dag_version_id_uq", ["dag_version_id"])

with op.batch_alter_table(
"serialized_dag", recreate="always", naming_convention=naming_convention
) as batch_op:
batch_op.drop_constraint("serialized_dag_pkey", type_="primary")
batch_op.add_column(sa.Column("id", UUIDType(binary=False), primary_key=True))
batch_op.drop_index("idx_fileloc_hash")
batch_op.drop_column("fileloc_hash")
batch_op.drop_column("fileloc")
batch_op.create_primary_key("serialized_dag_pkey", ["id"])
batch_op.add_column(sa.Column("dag_version_id", UUIDType(binary=False), nullable=False))
batch_op.create_foreign_key(
batch_op.f("serialized_dag_dag_version_id_fkey"),
"dag_version",
["dag_version_id"],
["id"],
ondelete="CASCADE",
)
batch_op.create_unique_constraint("serialized_dag_dag_version_id_uq", ["dag_version_id"])

with op.batch_alter_table("task_instance", schema=None) as batch_op:
batch_op.add_column(sa.Column("dag_version_id", UUIDType(binary=False)))
batch_op.create_foreign_key(
batch_op.f("task_instance_dag_version_id_fkey"),
"dag_version",
["dag_version_id"],
["id"],
ondelete="CASCADE",
)

with op.batch_alter_table("task_instance_history", schema=None) as batch_op:
batch_op.add_column(sa.Column("dag_version_id", UUIDType(binary=False)))

with op.batch_alter_table("dag_run", schema=None) as batch_op:
batch_op.add_column(sa.Column("dag_version_id", UUIDType(binary=False)))
batch_op.create_foreign_key(
batch_op.f("dag_run_dag_version_id_fkey"),
"dag_version",
["dag_version_id"],
["id"],
ondelete="CASCADE",
)
batch_op.drop_column("dag_hash")


def downgrade():
"""Unapply add dag versioning."""
with op.batch_alter_table("task_instance_history", schema=None) as batch_op:
batch_op.drop_column("dag_version_id")

with op.batch_alter_table("task_instance", schema=None) as batch_op:
batch_op.drop_constraint(batch_op.f("task_instance_dag_version_id_fkey"), type_="foreignkey")
batch_op.drop_column("dag_version_id")

with op.batch_alter_table("dag_code", schema=None) as batch_op:
batch_op.drop_column("id")
batch_op.drop_constraint(batch_op.f("dag_code_dag_version_id_fkey"), type_="foreignkey")
batch_op.drop_column("dag_version_id")
batch_op.create_primary_key("dag_code_pkey", ["fileloc_hash"])

with op.batch_alter_table("serialized_dag", schema=None, naming_convention=naming_convention) as batch_op:
batch_op.drop_column("id")
batch_op.add_column(sa.Column("fileloc", sa.String(length=2000), autoincrement=False, nullable=False))
batch_op.add_column(sa.Column("fileloc_hash", sa.BIGINT(), autoincrement=False, nullable=False))
batch_op.create_index("idx_fileloc_hash", ["fileloc_hash"], unique=False)
batch_op.create_primary_key("serialized_dag_pkey", ["dag_id"])
batch_op.drop_constraint(batch_op.f("serialized_dag_dag_version_id_fkey"), type_="foreignkey")
batch_op.drop_column("dag_version_id")

with op.batch_alter_table("dag_run", schema=None) as batch_op:
batch_op.add_column(sa.Column("dag_hash", sa.String(length=32), autoincrement=False, nullable=True))
batch_op.drop_constraint(batch_op.f("dag_run_dag_version_id_fkey"), type_="foreignkey")
batch_op.drop_column("dag_version_id")

op.drop_table("dag_version")
1 change: 1 addition & 0 deletions airflow/models/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ def import_all_models():

import airflow.models.asset
import airflow.models.backfill
import airflow.models.dag_version
import airflow.models.dagwarning
import airflow.models.errors
import airflow.models.serialized_dag
Expand Down
6 changes: 4 additions & 2 deletions airflow/models/backfill.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
from airflow.api_connexion.exceptions import NotFound
from airflow.exceptions import AirflowException
from airflow.models.base import Base, StringID
from airflow.models.dag_version import DagVersion
from airflow.settings import json
from airflow.utils import timezone
from airflow.utils.session import create_session
Expand Down Expand Up @@ -200,7 +201,7 @@ def _create_backfill_dag_run(
)
)
return

dag_version = DagVersion.get_latest_version(dag.dag_id, session=session)
dr = dag.create_dagrun(
triggered_by=DagRunTriggeredByType.BACKFILL,
execution_date=info.logical_date,
Expand All @@ -213,6 +214,7 @@ def _create_backfill_dag_run(
creating_job_id=None,
session=session,
backfill_id=backfill_id,
dag_version=dag_version,
)
session.add(
BackfillDagRun(
Expand Down Expand Up @@ -253,7 +255,7 @@ def _create_backfill(
from airflow.models.serialized_dag import SerializedDagModel

with create_session() as session:
serdag = session.get(SerializedDagModel, dag_id)
serdag = session.scalar(SerializedDagModel.latest_item_select_object(dag_id))
if not serdag:
raise NotFound(f"Could not find dag {dag_id}")
# todo: if dag has no schedule, raise
Expand Down
Loading