Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1597,6 +1597,7 @@ repos:
^airflow-core/src/airflow/models/__init__\.py$|
^airflow-core/src/airflow/models/asset\.py$|
^airflow-core/src/airflow/models/baseoperator\.py$|
^airflow-core/src/airflow/models/callback\.py$|
^airflow-core/src/airflow/models/connection\.py$|
^airflow-core/src/airflow/models/dag\.py$|
^airflow-core/src/airflow/models/dagrun\.py$|
Expand Down
2 changes: 1 addition & 1 deletion airflow-core/docs/img/airflow_erd.sha256
Original file line number Diff line number Diff line change
@@ -1 +1 @@
62f6d0409a5328d95254ca95018161f705f751b24768f0f4b86a736766143959
87f70348b8f566de55d6e9704925ca89359d92d6045f20e37f17e15989067f76
4,247 changes: 2,135 additions & 2,112 deletions airflow-core/docs/img/airflow_erd.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
4 changes: 3 additions & 1 deletion airflow-core/docs/migrations-ref.rst
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,9 @@ Here's the list of all the Database Migrations that are executed via when you ru
+-------------------------+------------------+-------------------+--------------------------------------------------------------+
| Revision ID | Revises ID | Airflow Version | Description |
+=========================+==================+===================+==============================================================+
| ``69ddce9a7247`` (head) | ``5cc8117e9285`` | ``3.2.0`` | Add ``fail_fast`` column to dag table. |
| ``b87d2135fa50`` (head) | ``69ddce9a7247`` | ``3.2.0`` | Restructure callback table. |
+-------------------------+------------------+-------------------+--------------------------------------------------------------+
| ``69ddce9a7247`` | ``5cc8117e9285`` | ``3.2.0`` | Add ``fail_fast`` column to dag table. |
+-------------------------+------------------+-------------------+--------------------------------------------------------------+
| ``5cc8117e9285`` | ``1b2c3d4e5f6g`` | ``3.2.0`` | Add Human In the Loop Detail History table. |
+-------------------------+------------------+-------------------+--------------------------------------------------------------+
Expand Down
71 changes: 57 additions & 14 deletions airflow-core/src/airflow/executors/workloads.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import os
import uuid
from abc import ABC
from datetime import datetime
from pathlib import Path
from typing import TYPE_CHECKING, Annotated, Literal
Expand All @@ -27,14 +28,13 @@

if TYPE_CHECKING:
from airflow.api_fastapi.auth.tokens import JWTGenerator
from airflow.models import DagRun
from airflow.models.callback import Callback as CallbackModel, CallbackFetchMethod
from airflow.models.taskinstance import TaskInstance as TIModel
from airflow.models.taskinstancekey import TaskInstanceKey


__all__ = [
"All",
"ExecuteTask",
]
__all__ = ["All", "ExecuteTask", "ExecuteCallback"]

log = structlog.get_logger(__name__)

Expand All @@ -43,6 +43,10 @@ class BaseWorkload(BaseModel):
token: str
"""The identity token for this workload"""

@staticmethod
def generate_token(sub_id: str, generator: JWTGenerator | None = None) -> str:
return generator.generate({"sub": sub_id}) if generator else ""


class BundleInfo(BaseModel):
"""Schema for telling task which bundle to run with."""
Expand Down Expand Up @@ -84,11 +88,17 @@ def key(self) -> TaskInstanceKey:
)


class ExecuteTask(BaseWorkload):
"""Execute the given Task."""
class Callback(BaseModel):
"""Schema for Callback with minimal required fields needed for Executors and Task SDK."""

id: uuid.UUID
fetch_type: CallbackFetchMethod
data: dict


class BaseDagBundleWorkload(BaseWorkload, ABC):
"""Base class for Workloads that are associated with a DAG bundle."""

ti: TaskInstance
"""The TaskInstance to execute"""
dag_rel_path: os.PathLike[str]
"""The filepath where the DAG can be found (likely prefixed with `DAG_FOLDER/`)"""

Expand All @@ -97,6 +107,12 @@ class ExecuteTask(BaseWorkload):
log_path: str | None
"""The rendered relative log filename template the task logs should be written to"""


class ExecuteTask(BaseDagBundleWorkload):
"""Execute the given Task."""

ti: TaskInstance

type: Literal["ExecuteTask"] = Field(init=False, default="ExecuteTask")

@classmethod
Expand All @@ -107,8 +123,6 @@ def make(
generator: JWTGenerator | None = None,
bundle_info: BundleInfo | None = None,
) -> ExecuteTask:
from pathlib import Path

from airflow.utils.helpers import log_filename_template_renderer

ser_ti = TaskInstance.model_validate(ti, from_attributes=True)
Expand All @@ -119,14 +133,43 @@ def make(
version=ti.dag_run.bundle_version,
)
fname = log_filename_template_renderer()(ti=ti)
token = ""

if generator:
token = generator.generate({"sub": str(ti.id)})
return cls(
ti=ser_ti,
dag_rel_path=dag_rel_path or Path(ti.dag_model.relative_fileloc),
token=token,
token=cls.generate_token(str(ti.id), generator),
log_path=fname,
bundle_info=bundle_info,
)


class ExecuteCallback(BaseDagBundleWorkload):
"""Execute the given Callback."""

callback: Callback

type: Literal["ExecuteCallback"] = Field(init=False, default="ExecuteCallback")

@classmethod
def make(
cls,
callback: CallbackModel,
dag_run: DagRun,
dag_rel_path: Path | None = None,
generator: JWTGenerator | None = None,
bundle_info: BundleInfo | None = None,
) -> ExecuteCallback:
if not bundle_info:
bundle_info = BundleInfo(
name=dag_run.dag_model.bundle_name,
version=dag_run.bundle_version,
)
fname = f"executor_callbacks/{callback.id}" # TODO: better log file template

return cls(
callback=Callback.model_validate(callback, from_attributes=True),
dag_rel_path=dag_rel_path or Path(dag_run.dag_model.relative_fileloc),
token=cls.generate_token(str(callback.id), generator),
log_path=fname,
bundle_info=bundle_info,
)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

"""
Restructure callback table.

Revision ID: b87d2135fa50
Revises: 69ddce9a7247
Create Date: 2025-09-10 13:58:23.435028

"""

from __future__ import annotations

import sqlalchemy as sa
from alembic import op
from sqlalchemy_utils import UUIDType

import airflow

# revision identifiers, used by Alembic.
revision = "b87d2135fa50"
down_revision = "69ddce9a7247"
branch_labels = None
depends_on = None
airflow_version = "3.2.0"


def upgrade():
"""
Restructure callback table.

Add/drop/modify columns, create foreign key for trigger and rename the table name.
"""
# Drop all rows. this will only affect users who have Dag Processor callbacks in non-terminal states
# during migration.
op.execute("DELETE FROM callback_request")
# TODO: migrate existing rows for pending DagProcessor callbacks

op.rename_table("callback_request", "callback")
with op.batch_alter_table("callback", schema=None) as batch_op:
batch_op.add_column(sa.Column("type", sa.String(length=20), nullable=False))
batch_op.add_column(sa.Column("fetch_method", sa.String(length=20), nullable=False))
batch_op.add_column(sa.Column("data", airflow.utils.sqlalchemy.ExtendedJSON(), nullable=True))
batch_op.add_column(sa.Column("state", sa.String(length=10), nullable=True))
batch_op.add_column(sa.Column("output", sa.Text(), nullable=True))
batch_op.add_column(sa.Column("trigger_id", sa.Integer(), nullable=True))
batch_op.create_foreign_key(batch_op.f("callback_trigger_id_fkey"), "trigger", ["trigger_id"], ["id"])

# Replace INTEGER id with UUID id
batch_op.drop_column("id")
batch_op.add_column(sa.Column("id", UUIDType(binary=False), nullable=False, primary_key=True))

batch_op.drop_column("callback_data")
batch_op.drop_column("callback_type")


def downgrade():
"""
Unapply the callback table restructure.

Add/drop/modify columns, create foreign key for trigger and rename the table name.
"""
op.execute("DELETE FROM callback") # Drop all rows. See comment in upgrade().

with op.batch_alter_table("callback", schema=None) as batch_op:
batch_op.add_column(sa.Column("callback_type", sa.String(length=20), nullable=False))
batch_op.add_column(
sa.Column("callback_data", airflow.utils.sqlalchemy.ExtendedJSON(), nullable=False)
)

# Replace UUID id with INTEGER id
batch_op.drop_column("id")
batch_op.add_column(sa.Column("id", sa.INTEGER(), nullable=False, autoincrement=True))
# MySQL requires the primary key constraint to be created separately when autoincrement=True
batch_op.create_primary_key("callback_request_pkey", ["id"])

batch_op.drop_constraint(batch_op.f("callback_trigger_id_fkey"), type_="foreignkey")
batch_op.drop_column("trigger_id")
batch_op.drop_column("output")
batch_op.drop_column("state")
batch_op.drop_column("data")
batch_op.drop_column("fetch_method")
batch_op.drop_column("type")

op.rename_table("callback", "callback_request")
2 changes: 2 additions & 0 deletions airflow-core/src/airflow/models/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ def __getattr__(name):
"BaseOperator": "airflow.sdk",
"BaseOperatorLink": "airflow.sdk",
"BaseXCom": "airflow.sdk.bases.xcom",
"Callback": "airflow.models.callback",
"Connection": "airflow.models.connection",
"DagBag": "airflow.models.dagbag",
"DagModel": "airflow.models.dag",
Expand Down Expand Up @@ -125,6 +126,7 @@ def __getattr__(name):
# I was unable to get mypy to respect a airflow/models/__init__.pyi, so
# having to resort back to this hacky method
from airflow.models.base import ID_LEN, Base
from airflow.models.callback import Callback
from airflow.models.connection import Connection
from airflow.models.dag import DagModel, DagTag
from airflow.models.dagbag import DagBag
Expand Down
Loading