Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1540,6 +1540,7 @@ repos:
^airflow-core/tests/unit/core/test_configuration\.py$|
^airflow-core/tests/unit/models/test_renderedtifields\.py$|
^airflow-core/tests/unit/models/test_variable\.py$

- id: check-sdk-imports
name: Check for SDK imports in core files
entry: ./scripts/ci/prek/check_sdk_imports.py
Expand Down Expand Up @@ -1638,3 +1639,10 @@ repos:
^airflow-core/src/airflow/utils/trigger_rule\.py$|
^airflow-core/src/airflow/utils/types\.py$
## ONLY ADD PREK HOOKS HERE THAT REQUIRE CI IMAGE
- id: check-schema-defaults
name: Check schema defaults match server-side defaults
entry: ./scripts/ci/prek/check_schema_defaults.py
language: python
files: ^airflow-core/src/airflow/serialization/schema\.json$|^airflow-core/src/airflow/serialization/serialized_objects\.py$
pass_filenames: false
require_serial: true
137 changes: 137 additions & 0 deletions airflow-core/docs/administration-and-deployment/dag-serialization.rst
Original file line number Diff line number Diff line change
Expand Up @@ -119,3 +119,140 @@ define a ``json`` variable in local Airflow settings (``airflow_local_settings.p

See :ref:`Configuring local settings <set-config:configuring-local-settings>` for details on how to
configure local settings.


.. _dag-serialization-defaults:

DAG Serialization with Default Values (Airflow 3.1+)
------------------------------------------------------

Starting with Airflow 3.1, DAG serialization establishes a versioned contract between Task SDKs
and Airflow server components (Scheduler & API-Server). Combined with the Task Execution API, this
decouples client and server components, enabling independent deployments and upgrades while maintaining
backward compatibility and automatic default value resolution.

How Default Values Work
~~~~~~~~~~~~~~~~~~~~~~~

When Airflow processes DAGs, it applies default values in a specific order of precedence for the server:

1. **Schema defaults**: Built-in Airflow defaults (lowest priority)
2. **Client defaults**: SDK-specific defaults
3. **DAG default_args**: DAG-level settings (existing behavior)
4. **Partial arguments**: MappedOperator shared values
5. **Task values**: Explicit task settings (highest priority)

This means you can set defaults at different levels and more specific settings will override
more general ones.

JSON Structure
~~~~~~~~~~~~~~

Serialized DAGs now include a ``client_defaults`` section that contains common default values:

.. code-block:: json

{
"__version": 2,
"client_defaults": {
"tasks": {
"retry_delay": 300.0,
"owner": "data_team"
}
},
"dag": {
"dag_id": "example_dag",
"default_args": {
"retries": 3
},
"tasks": [{
"task_id": "example_task",
"task_type": "BashOperator",
"_task_module": "airflow.operators.bash",
"bash_command": "echo hello",
"owner": "specific_owner"
}]
}
}

How Values Are Applied
~~~~~~~~~~~~~~~~~~~~~~

In the example above, the task ``example_task`` will have these final values:

- **retry_delay**: 300.0 (from client_defaults.tasks)
- **owner**: "data_team" (from client_defaults.tasks)
- **retries**: 3 (from dag.default_args, overrides client_defaults)
- **bash_command**: "echo hello" (explicit task value)
- **pool**: "default_pool" (from schema defaults)

The system automatically fills in any missing values by walking up the hierarchy.

MappedOperator Default Handling
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

MappedOperators (dynamic task mapping) also participate in the default value system:

.. code-block:: python

# DAG Definition
BashOperator.partial(task_id="mapped_task", retries=2, owner="team_lead").expand(
bash_command=["echo 1", "echo 2", "echo 3"]
)

In this example, each of the three generated task instances will inherit:

- **retries**: 2 (from partial arguments)
- **owner**: "team_lead" (from partial arguments)
- **pool**: "default_pool" (from client_defaults, since not specified in partial)
- **bash_command**: "echo 1", "echo 2", or "echo 3" respectively (from expand)

Independent Deployment Architecture
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

**Decoupled Components:**
The serialization contract, combined with the Task Execution API, enables complete separation between:

- **Server Components** (Scheduler, API-Server): Handle orchestration, don't run user code
- **Client Components** (Task SDK, DAG processing): Run user code in isolated environments

**Key Benefits:**

- **Independent upgrades**: Upgrade server components without touching user environments
- **Version compatibility**: Single server version supports multiple SDK versions simultaneously
- **Deployment flexibility**: Server and client components can be deployed and scaled separately
- **Security isolation**: User code runs only in client environments, never on server components
- **Multi-language SDK support**: Any language can implement a compliant Task SDK

**SDK Requirements:**
Any Task SDK implementation must:

1. **Follow published schemas**:
- DAG serialization: Produce JSON that validates against schema. Example: ``https://airflow.apache.org/schemas/dag-serialization/v2.json``
- Task execution: Support runtime communication via Execution API schema. Example: ``https://airflow.apache.org/schemas/execution-api/2025-05-20.json``
2. **Include client_defaults**: Optionally, provide SDK-specific defaults in the ``client_defaults.tasks`` section
3. **Use proper versioning**: Include ``__version`` field to indicate serialization format

**Server Guarantees:**
As long as SDKs conform to both schema contracts, Airflow server components will:

- Correctly deserialize DAGs from any compliant SDK
- Support task execution communication during runtime
- Apply appropriate default values according to the hierarchy
- Maintain compatibility across SDK versions and languages

Implementation Status
~~~~~~~~~~~~~~~~~~~~~

**Current State (Airflow 3.1):**
The serialization contract establishes the foundation for client/server decoupling. While some
server components still contain Task SDK code (and vice versa), the contract ensures that:

- **Schema compliance** enables independent deployment when components are separated
- **Version compatibility** works regardless of code coupling
- **Deployment separation** is architecturally supported even if not yet fully implemented

**Future Evolution:**
Complete code decoupling between server and client components is planned for future releases.
The schema contract provides the stable interface that will remain consistent as this evolution
continues.
4 changes: 2 additions & 2 deletions airflow-core/src/airflow/jobs/scheduler_job_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -897,7 +897,7 @@ def process_executor_events(
ti.set_state(state)
continue
ti.task = task
if task.on_retry_callback or task.on_failure_callback:
if task.has_on_retry_callback or task.has_on_failure_callback:
# Only log the error/extra info here, since the `ti.handle_failure()` path will log it
# too, which would lead to double logging
cls.logger().error(msg)
Expand Down Expand Up @@ -2033,7 +2033,7 @@ def _maybe_requeue_stuck_ti(self, *, ti, session, executor):
exc_info=True,
)
else:
if task.on_failure_callback:
if task.has_on_failure_callback:
if inspect(ti).detached:
ti = session.merge(ti)
request = TaskCallbackRequest(
Expand Down
4 changes: 2 additions & 2 deletions airflow-core/src/airflow/models/dagrun.py
Original file line number Diff line number Diff line change
Expand Up @@ -1998,8 +1998,8 @@ def schedule_tis(
assert isinstance(task, Operator)
if (
task.inherits_from_empty_operator
and not task.on_execute_callback
and not task.on_success_callback
and not task.has_on_execute_callback
and not task.has_on_success_callback
and not task.outlets
and not task.inlets
):
Expand Down
101 changes: 58 additions & 43 deletions airflow-core/src/airflow/models/mappedoperator.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,19 +30,7 @@
from airflow.exceptions import AirflowException
from airflow.sdk import BaseOperator as TaskSDKBaseOperator
from airflow.sdk.definitions._internal.abstractoperator import (
DEFAULT_EXECUTOR,
DEFAULT_IGNORE_FIRST_DEPENDS_ON_PAST,
DEFAULT_OWNER,
DEFAULT_POOL_NAME,
DEFAULT_POOL_SLOTS,
DEFAULT_PRIORITY_WEIGHT,
DEFAULT_QUEUE,
DEFAULT_RETRIES,
DEFAULT_RETRY_DELAY,
DEFAULT_TRIGGER_RULE,
DEFAULT_WEIGHT_RULE,
NotMapped,
TaskStateChangeCallbackAttrType,
)
from airflow.sdk.definitions._internal.node import DAGNode
from airflow.sdk.definitions.mappedoperator import MappedOperator as TaskSDKMappedOperator
Expand Down Expand Up @@ -91,6 +79,7 @@ def is_mapped(task: Operator) -> TypeGuard[MappedOperator]:
class MappedOperator(DAGNode):
"""Object representing a mapped operator in a DAG."""

# Stores minimal class type information (task_type, _operator_name) instead of full serialized data
operator_class: dict[str, Any]
partial_kwargs: dict[str, Any] = attrs.field(init=False, factory=dict)

Expand All @@ -107,10 +96,10 @@ class MappedOperator(DAGNode):
_can_skip_downstream: bool = attrs.field(alias="can_skip_downstream")
_is_sensor: bool = attrs.field(alias="is_sensor", default=False)
_task_module: str
_task_type: str
task_type: str
_operator_name: str
start_trigger_args: StartTriggerArgs | None
start_from_trigger: bool
start_trigger_args: StartTriggerArgs | None = None
start_from_trigger: bool = False
_needs_expansion: bool = True

dag: SchedulerDAG = attrs.field(init=False)
Expand Down Expand Up @@ -154,11 +143,6 @@ def leaves(self) -> Sequence[DAGNode]:
# TODO (GH-52141): Review if any of the properties below are used in the
# SDK and the scheduler, and remove those not needed.

@property
def task_type(self) -> str:
"""Implementing Operator."""
return self._task_type

@property
def operator_name(self) -> str:
return self._operator_name
Expand Down Expand Up @@ -186,11 +170,11 @@ def inherits_from_skipmixin(self) -> bool:

@property
def owner(self) -> str:
return self.partial_kwargs.get("owner", DEFAULT_OWNER)
return self.partial_kwargs.get("owner", SerializedBaseOperator.owner)

@property
def trigger_rule(self) -> TriggerRule:
return self.partial_kwargs.get("trigger_rule", DEFAULT_TRIGGER_RULE)
return self.partial_kwargs.get("trigger_rule", SerializedBaseOperator.trigger_rule)

@property
def is_setup(self) -> bool:
Expand All @@ -206,7 +190,9 @@ def depends_on_past(self) -> bool:

@property
def ignore_first_depends_on_past(self) -> bool:
value = self.partial_kwargs.get("ignore_first_depends_on_past", DEFAULT_IGNORE_FIRST_DEPENDS_ON_PAST)
value = self.partial_kwargs.get(
"ignore_first_depends_on_past", SerializedBaseOperator.ignore_first_depends_on_past
)
return bool(value)

@property
Expand All @@ -215,19 +201,19 @@ def wait_for_downstream(self) -> bool:

@property
def retries(self) -> int:
return self.partial_kwargs.get("retries", DEFAULT_RETRIES)
return self.partial_kwargs.get("retries", SerializedBaseOperator.retries)

@property
def queue(self) -> str:
return self.partial_kwargs.get("queue", DEFAULT_QUEUE)
return self.partial_kwargs.get("queue", SerializedBaseOperator.queue)

@property
def pool(self) -> str:
return self.partial_kwargs.get("pool", DEFAULT_POOL_NAME)
return self.partial_kwargs.get("pool", SerializedBaseOperator.pool)

@property
def pool_slots(self) -> int:
return self.partial_kwargs.get("pool_slots", DEFAULT_POOL_SLOTS)
return self.partial_kwargs.get("pool_slots", SerializedBaseOperator.pool_slots)

@property
def resources(self) -> Resources | None:
Expand All @@ -242,36 +228,36 @@ def max_active_tis_per_dagrun(self) -> int | None:
return self.partial_kwargs.get("max_active_tis_per_dagrun")

@property
def on_execute_callback(self) -> TaskStateChangeCallbackAttrType:
return self.partial_kwargs.get("on_execute_callback") or []
def has_on_execute_callback(self) -> bool:
return bool(self.partial_kwargs.get("has_on_execute_callback", False))

@property
def on_failure_callback(self) -> TaskStateChangeCallbackAttrType:
return self.partial_kwargs.get("on_failure_callback") or []
def has_on_failure_callback(self) -> bool:
return bool(self.partial_kwargs.get("has_on_failure_callback", False))

@property
def on_retry_callback(self) -> TaskStateChangeCallbackAttrType:
return self.partial_kwargs.get("on_retry_callback") or []
def has_on_retry_callback(self) -> bool:
return bool(self.partial_kwargs.get("has_on_retry_callback", False))

@property
def on_success_callback(self) -> TaskStateChangeCallbackAttrType:
return self.partial_kwargs.get("on_success_callback") or []
def has_on_success_callback(self) -> bool:
return bool(self.partial_kwargs.get("has_on_success_callback", False))

@property
def on_skipped_callback(self) -> TaskStateChangeCallbackAttrType:
return self.partial_kwargs.get("on_skipped_callback") or []
def has_on_skipped_callback(self) -> bool:
return bool(self.partial_kwargs.get("has_on_skipped_callback", False))

@property
def run_as_user(self) -> str | None:
return self.partial_kwargs.get("run_as_user")

@property
def priority_weight(self) -> int:
return self.partial_kwargs.get("priority_weight", DEFAULT_PRIORITY_WEIGHT)
return self.partial_kwargs.get("priority_weight", SerializedBaseOperator.priority_weight)

@property
def retry_delay(self) -> datetime.timedelta:
return self.partial_kwargs.get("retry_delay", DEFAULT_RETRY_DELAY)
return self.partial_kwargs["retry_delay"]

@property
def retry_exponential_backoff(self) -> bool:
Expand All @@ -280,12 +266,12 @@ def retry_exponential_backoff(self) -> bool:
@property
def weight_rule(self) -> PriorityWeightStrategy:
return validate_and_load_priority_weight_strategy(
self.partial_kwargs.get("weight_rule", DEFAULT_WEIGHT_RULE)
self.partial_kwargs.get("weight_rule", SerializedBaseOperator._weight_rule)
)

@property
def executor(self) -> str | None:
return self.partial_kwargs.get("executor", DEFAULT_EXECUTOR)
return self.partial_kwargs.get("executor")

@property
def executor_config(self) -> dict:
Expand All @@ -311,8 +297,37 @@ def on_failure_fail_dagrun(self) -> bool:
def on_failure_fail_dagrun(self, v) -> None:
self.partial_kwargs["on_failure_fail_dagrun"] = bool(v)

def get_serialized_fields(self):
return TaskSDKMappedOperator.get_serialized_fields()
@classmethod
def get_serialized_fields(cls):
"""Fields to extract from JSON-Serialized DAG."""
return frozenset(
{
"_disallow_kwargs_override",
"_expand_input_attr",
"_is_sensor",
"_needs_expansion",
"_operator_name",
"_task_module",
"downstream_task_ids",
"end_date",
"operator_extra_links",
"params",
"partial_kwargs",
"start_date",
"start_from_trigger",
"start_trigger_args",
"task_id",
"task_type",
"template_ext",
"template_fields",
"template_fields_renderers",
"ui_color",
"ui_fgcolor",
# TODO: Need to verify if the following two are needed on the server side.
"expand_input",
"op_kwargs_expand_input",
}
)

@functools.cached_property
def operator_extra_link_dict(self) -> dict[str, BaseOperatorLink]:
Expand Down
Loading