Skip to content

Conversation

@amoghrajesh
Copy link
Contributor

@amoghrajesh amoghrajesh commented Dec 22, 2025

Related PR: #47061

In this PR I intend to migrate / swap the serialization of deferred task data (trigger_kwargs and next_kwargs) from BaseSerialization.serialize to instead use the serde library which is already in task sdk: #58992

Why do it?

  • The current way of using BaseSerialization creates tight coupling between task-sdk and airflow-core
  • Inconsistent serialization formats across different parts of the system: xcoms use serde, triggers use BaseSerialisation
  • Double serialisation while storing kwargs in database, explained in next section

Changes of note / Considerations while making this change

When a task defers, Airflow saves:

  1. Trigger kwargs: The kwargs that are passed along to the trigger when it fires up
  2. Next kwargs or resumption args - what data to pass back when the task resumes

Previously, both used BaseSerialization to store data in the DB.

This is how the current flow looks like:

Forward flow (defer):

WorkerBaseSerialisation.serialize() → API ServerBaseSerialisation.deserialize() → ExtendedJSONBaseSerialization.serialize() → Database

Return flow (resume):

DatabaseBaseSerialization.deserialize() ← ExtendedJSONAPI ServerWorker

Target flow with this change:

Forward flow (defer):

Workerserde.serialize() → PydanticAPI ServerJSON/JSONBDatabase

Return flow (resume):

DatabaseJSON/JSONBAPI ServerWorkerserde.deserialize()

You can also see above that the API server was doing unnecessary work. When a worker sent deferred task data to the API server, the data arrived in serialized format. The API server would:

  • Deserialize the data back into Python objects (datetime, custom objects, etc.)
  • Immediately re-serialize the same data when storing it in the database through ExtendedJSON

Changes

  1. The worker now serializes data with serde before sending it anywhere. The API server receives this serde serialised data and stores it directly without any processing. It's truly a proxy now - no deserializing, no re-serializing, just storing as-is.

  2. Due to above, ExtendedJSON (which auto-serializes) is swapped with plain JSON/JSONB columns. This removes the automatic serialization at the database layer.

  3. When a task resumes, it gets data as stored it the database and it explicitly deserializes the data back to original types.

Key design principle: Serialization/Deserialization is the worker's responsibility (very similar to xcoms), not the API server's or the database's. The worker knows what types it's working with and when it needs them converted.

Testing

Example with types now

Let's assume that this is the next_kwargs passed around (trigger_kwargs will be similar)

{
    "location": ("New York", "USA"),
    "timestamp": pendulum.datetime(2024, 1, 15, 10, 30, 0, tz="UTC")
}

Earlier storage format:

BaseSerialization.serialize(val)
Out[6]: 
{<Encoding.VAR: '__var'>: {'location': {<Encoding.VAR: '__var'>: ['New York',
    'USA'],
   <Encoding.TYPE: '__type'>: <DagAttributeTypes.TUPLE: 'tuple'>},
  'timestamp': {<Encoding.VAR: '__var'>: 1705314600.0,
   <Encoding.TYPE: '__type'>: <DagAttributeTypes.DATETIME: 'datetime'>}},
 <Encoding.TYPE: '__type'>: <DagAttributeTypes.DICT: 'dict'>}

Now:

from airflow.sdk.serde import serialize
serialize(val)
Out[8]: 
{'location': {'__classname__': 'builtins.tuple',
  '__version__': 1,
  '__data__': ['New York', 'USA']},
 'timestamp': {'__classname__': 'pendulum.datetime.DateTime',
  '__version__': 2,
  '__data__': {'timestamp': 1705314600.0,
   'tz': {'__classname__': 'builtins.tuple',
    '__version__': 1,
    '__data__': ['UTC', 'pendulum.tz.timezone.Timezone', 1, True]}}}}

This data would then be deserialized by BaseSerialization earlier and now by serde

Earlier:

BaseSerialization.deserialize(BaseSerialization.serialize(val)) == val
Out[9]: True
from airflow.sdk.serde import serialize, deserialize
deserialize(serialize(val))

Out[11]: 
{'location': ('New York', 'USA'),
 'timestamp': DateTime(2024, 1, 15, 10, 30, 0, tzinfo=Timezone('UTC'))}
deserialize(serialize(val)) == val
Out[12]: True

Observe no type loss in both cases.

Compat testing

What we really care about most is backcompat. That means server upgraded but worker hasn't / isn't ready yet.

Situation 1

To verify that existing deferred tasks can resume after upgrading to the new serde serialization format, I ran this test:

  1. On main, I used the below dag to create a task instance which goes into deferred state:
from airflow import DAG
from airflow.providers.standard.sensors.date_time import DateTimeSensorAsync
from airflow.sdk import Context
from datetime import datetime
import pendulum


class MyCustomSensor(DateTimeSensorAsync):

    def execute(self, context: Context):
        from airflow.providers.standard.triggers.temporal import DateTimeTrigger

        target_time = pendulum.parse(self.target_time)

        self.defer(
            trigger=DateTimeTrigger(moment=target_time),
            method_name="execute_complete",
            kwargs={
                "numbers": [1, 2, 3, 4, 5],
                "numbers_in_tuple": (6, 7, 8, 9, 10),
                "message": "hello from deferred task",
                "timestamp": pendulum.now("UTC"),
            },
        )

    def execute_complete(self, context: Context, event=None, **kwargs):
        """Log the received data."""
        self.log.info(f"Event: {event}")
        self.log.info(f"Numbers: {kwargs.get('numbers')}")
        self.log.info(f"Numbers in tuple are: {kwargs.get('numbers_in_tuple')}")
        self.log.info(f"Message: {kwargs.get('message')}")
        self.log.info(f"Timestamp: {kwargs.get('timestamp')}")
        return True


with DAG(
    "defer_datetime",
    start_date=datetime(2024, 1, 1),
    schedule=None,
    catchup=False,
):
    MyCustomSensor(
        task_id="wait_for_time",
        target_time="""{{ macros.datetime.utcnow() + macros.timedelta(minutes=1) }}""",
    )

Once in deferred, recorded its details such as:

Trigger table:

1,airflow.providers.standard.triggers.temporal.DateTimeTrigger,"{""__var"": {""moment"": {""__var"": 1766486799.605149, ""__type"": ""datetime""}, ""end_from_trigger"": false}, ""__type"": ""dict""}",2025-12-23 10:45:39.681875 +00:00,3

Task Instance Table:

{"__var": {"message": "hello from deferred task", "numbers": [1, 2, 3, 4, 5], "timestamp": {"__var": 1766486739.627232, "__type": "datetime"}, "numbers_in_tuple": {"__var": [6, 7, 8, 9, 10], "__type": "tuple"}}, "__type": "dict"}

Logs when it runs fine on main look like:

[2025-12-23 16:16:39] INFO - yielding event with payload DateTime(2025, 12, 23, 10, 46, 39, 605149, tzinfo=Timezone('UTC')) source=airflow.providers.standard.triggers.temporal.DateTimeTrigger loc=temporal.py:85
[2025-12-23 16:16:39] INFO - Trigger fired event name=defer_datetime/manual__2025-12-23T10:45:36+00:00/wait_for_time/-1/1 (ID 1) result=TriggerEvent<DateTime(2025, 12, 23, 10, 46, 39, 605149, tzinfo=Timezone('UTC'))> loc=triggerer_job_runner.py:1115
[2025-12-23 16:16:39] INFO - trigger completed name=defer_datetime/manual__2025-12-23T10:45:36+00:00/wait_for_time/-1/1 (ID 1) loc=triggerer_job_runner.py:1136
[2025-12-23 16:16:40] INFO - DAG bundles loaded: dags-folder source=airflow.dag_processing.bundles.manager.DagBundlesManager loc=manager.py:209
[2025-12-23 16:16:40] INFO - Filling up the DagBag from /files/test_dags/deferred.py source=airflow.dag_processing.dagbag.DagBag loc=dagbag.py:627
[2025-12-23 16:16:40] INFO - Event: 2025-12-23 10:46:39.605149+00:00 source=airflow.task.operators.unusual_prefix_66243078da79234270e28439944ee81b22f98b6a_deferred.MyCustomSensor loc=deferred.py:28
[2025-12-23 16:16:40] INFO - Numbers: [1, 2, 3, 4, 5] source=airflow.task.operators.unusual_prefix_66243078da79234270e28439944ee81b22f98b6a_deferred.MyCustomSensor loc=deferred.py:29
[2025-12-23 16:16:40] INFO - Numbers in tuple are: (6, 7, 8, 9, 10) source=airflow.task.operators.unusual_prefix_66243078da79234270e28439944ee81b22f98b6a_deferred.MyCustomSensor loc=deferred.py:30
[2025-12-23 16:16:40] INFO - Message: hello from deferred task source=airflow.task.operators.unusual_prefix_66243078da79234270e28439944ee81b22f98b6a_deferred.MyCustomSensor loc=deferred.py:31
[2025-12-23 16:16:40] INFO - Timestamp: 2025-12-23 10:45:39.627232+00:00 source=airflow.task.operators.unusual_prefix_66243078da79234270e28439944ee81b22f98b6a_deferred.MyCustomSensor loc=deferred.py:32
[2025-12-23 16:16:40] INFO - Pushing xcom ti=RuntimeTaskInstance(id=UUID('019b4ad0-8081-776a-99a1-5bcdfdc216c5'), task_id='wait_for_time', dag_id='defer_datetime', run_id='manual__2025-12-23T10:45:36+00:00', try_number=1, dag_version_id=UUID('019b4ad0-196b-7152-9259-403fb2d7c3b4'), map_index=-1, hostname='ce72bab3f5a3', context_carrier={}, task=<Task(MyCustomSensor): wait_for_time>, bundle_instance=LocalDagBundle(name=dags-folder), max_tries=0, start_date=datetime.datetime(2025, 12, 23, 10, 46, 40, 91122, tzinfo=datetime.timezone.utc), end_date=None, state=<TaskInstanceState.RUNNING: 'running'>, is_mapped=False, rendered_map_index=None, sentry_integration='') source=task loc=task_runner.py:1463
  1. Changed from main branch to my branch by shutting breeze off and starting without db reset on my branch

  2. Went back to airflow UI to check if anything crashes

When the datetime trigger fired, the deferred task resumed fine
The worker's deser logic fell back to the BaseSerialization format and successfully deserialized and passed next_kwargs to execute_complete() and due to that the task completed successfully with all data intact.

image

Ran purely on new branch:

image

Situation 2

As mentioned by @kaxil, we also need to test and certify the scenario where the server is new but worker is still old, ie. it still sends BaseSerialisation kwargs instead of using serde.

To test this, using the same dag as above, I undid all the changes to task sdk

Subject: [PATCH] Adding a fixture for creating connections
---
Index: task-sdk/tests/task_sdk/execution_time/test_task_runner.py
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
diff --git a/task-sdk/tests/task_sdk/execution_time/test_task_runner.py b/task-sdk/tests/task_sdk/execution_time/test_task_runner.py
--- a/task-sdk/tests/task_sdk/execution_time/test_task_runner.py	(revision c73862f948cae4890d8506782f2d1a6dc072fa44)
+++ b/task-sdk/tests/task_sdk/execution_time/test_task_runner.py	(date 1766584407206)
@@ -363,30 +363,18 @@
     )
     time_machine.move_to(instant, tick=False)
 
-    # Expected DeferTask, it is constructed by _defer_task from exception and is sent to supervisor
+    # Expected DeferTask
     expected_defer_task = DeferTask(
         state="deferred",
         classpath="airflow.providers.standard.triggers.temporal.DateTimeTrigger",
+        # Since we are in the task process here, we expect this to have not been encoded by serde yet
         trigger_kwargs={
-            "moment": {
-                "__classname__": "pendulum.datetime.DateTime",
-                "__version__": 2,
-                "__data__": {
-                    "timestamp": 1732233603.0,
-                    "tz": {
-                        "__classname__": "builtins.tuple",
-                        "__version__": 1,
-                        "__data__": ["UTC", "pendulum.tz.timezone.Timezone", 1, True],
-                    },
-                },
-            },
             "end_from_trigger": False,
+            "moment": instant + timedelta(seconds=3),
         },
         trigger_timeout=None,
         next_method="execute_complete",
         next_kwargs={},
-        rendered_map_index=None,
-        type="DeferTask",
     )
 
     # Run the task
Index: task-sdk/src/airflow/sdk/definitions/dag.py
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
diff --git a/task-sdk/src/airflow/sdk/definitions/dag.py b/task-sdk/src/airflow/sdk/definitions/dag.py
--- a/task-sdk/src/airflow/sdk/definitions/dag.py	(revision c73862f948cae4890d8506782f2d1a6dc072fa44)
+++ b/task-sdk/src/airflow/sdk/definitions/dag.py	(date 1766584407199)
@@ -1428,7 +1428,6 @@
             ti.task = create_scheduler_operator(taskrun_result.ti.task)
 
             if ti.state == TaskInstanceState.DEFERRED and isinstance(msg, DeferTask) and run_triggerer:
-                from airflow.sdk.serde import deserialize, serialize
                 from airflow.utils.session import create_session
 
                 # API Server expects the task instance to be in QUEUED state before
@@ -1436,12 +1435,10 @@
                 ti.set_state(TaskInstanceState.QUEUED)
 
                 log.info("[DAG TEST] running trigger in line")
-                # trigger_kwargs need to be deserialized before passing to the trigger class since they are in serde encoded format
-                kwargs = deserialize(msg.trigger_kwargs)  # type: ignore[type-var]  # mypy doesn't like passing JsonValue | str to deserialize but its correct
-                trigger = import_string(msg.classpath)(**kwargs)
+                trigger = import_string(msg.classpath)(**msg.trigger_kwargs)
                 event = _run_inline_trigger(trigger, task_sdk_ti)
                 ti.next_method = msg.next_method
-                ti.next_kwargs = {"event": serialize(event.payload)} if event else msg.next_kwargs
+                ti.next_kwargs = {"event": event.payload} if event else msg.next_kwargs
                 log.info("[DAG TEST] Trigger completed")
 
                 # Set the state to SCHEDULED so that the task can be resumed.
Index: task-sdk/src/airflow/sdk/execution_time/task_runner.py
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
diff --git a/task-sdk/src/airflow/sdk/execution_time/task_runner.py b/task-sdk/src/airflow/sdk/execution_time/task_runner.py
--- a/task-sdk/src/airflow/sdk/execution_time/task_runner.py	(revision c73862f948cae4890d8506782f2d1a6dc072fa44)
+++ b/task-sdk/src/airflow/sdk/execution_time/task_runner.py	(date 1766584407200)
@@ -962,19 +962,12 @@
     log.info("Pausing task as DEFERRED. ", dag_id=ti.dag_id, task_id=ti.task_id, run_id=ti.run_id)
     classpath, trigger_kwargs = defer.trigger.serialize()
 
-    from typing import cast
-
-    from airflow.sdk.serde import serialize as serde_serialize
-
-    trigger_kwargs = cast("JsonValue", serde_serialize(trigger_kwargs))
-    next_kwargs = cast("JsonValue", serde_serialize(defer.kwargs or {}))
-
     msg = DeferTask(
         classpath=classpath,
         trigger_kwargs=trigger_kwargs,
         trigger_timeout=defer.timeout,
         next_method=defer.method_name,
-        next_kwargs=next_kwargs,
+        next_kwargs=defer.kwargs or {},
     )
     state = TaskInstanceState.DEFERRED
 
@@ -1382,20 +1375,10 @@
     execute = task.execute
 
     if ti._ti_context_from_server and (next_method := ti._ti_context_from_server.next_method):
-        from airflow.sdk.serde import deserialize
-
-        next_kwargs_data = ti._ti_context_from_server.next_kwargs or {}
-        try:
-            if TYPE_CHECKING:
-                assert isinstance(next_kwargs_data, dict)
-            kwargs = deserialize(next_kwargs_data)
-        except (ImportError, KeyError, AttributeError, TypeError):
-            from airflow.serialization.serialized_objects import BaseSerialization
+        from airflow.serialization.serialized_objects import BaseSerialization
 
-            kwargs = BaseSerialization.deserialize(next_kwargs_data)
+        kwargs = BaseSerialization.deserialize(ti._ti_context_from_server.next_kwargs or {})
 
-        if TYPE_CHECKING:
-            assert isinstance(kwargs, dict)
         execute = functools.partial(task.resume_execution, next_method=next_method, next_kwargs=kwargs)
 
     ctx = contextvars.copy_context()
Index: task-sdk/tests/task_sdk/execution_time/test_supervisor.py
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
diff --git a/task-sdk/tests/task_sdk/execution_time/test_supervisor.py b/task-sdk/tests/task_sdk/execution_time/test_supervisor.py
--- a/task-sdk/tests/task_sdk/execution_time/test_supervisor.py	(revision c73862f948cae4890d8506782f2d1a6dc072fa44)
+++ b/task-sdk/tests/task_sdk/execution_time/test_supervisor.py	(date 1766584407200)
@@ -674,22 +674,13 @@
                 classpath="airflow.providers.standard.triggers.temporal.DateTimeTrigger",
                 next_method="execute_complete",
                 trigger_kwargs={
-                    "moment": {
-                        "__classname__": "pendulum.datetime.DateTime",
-                        "__version__": 2,
-                        "__data__": {
-                            "timestamp": 1730982899.0,
-                            "tz": {
-                                "__classname__": "builtins.tuple",
-                                "__version__": 1,
-                                "__data__": ["UTC", "pendulum.tz.timezone.Timezone", 1, True],
-                            },
-                        },
-                    },
-                    "end_from_trigger": False,
+                    "__type": "dict",
+                    "__var": {
+                        "moment": {"__type": "datetime", "__var": 1730982899.0},
+                        "end_from_trigger": False,
+                    },
                 },
-                trigger_timeout=None,
-                next_kwargs={},
+                next_kwargs={"__type": "dict", "__var": {}},
             ),
         )
 
Index: task-sdk/src/airflow/sdk/execution_time/comms.py
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
diff --git a/task-sdk/src/airflow/sdk/execution_time/comms.py b/task-sdk/src/airflow/sdk/execution_time/comms.py
--- a/task-sdk/src/airflow/sdk/execution_time/comms.py	(revision c73862f948cae4890d8506782f2d1a6dc072fa44)
+++ b/task-sdk/src/airflow/sdk/execution_time/comms.py	(date 1766584407199)
@@ -60,7 +60,7 @@
 import attrs
 import msgspec
 import structlog
-from pydantic import AwareDatetime, BaseModel, ConfigDict, Field, JsonValue, TypeAdapter
+from pydantic import AwareDatetime, BaseModel, ConfigDict, Field, JsonValue, TypeAdapter, field_serializer
 
 from airflow.sdk.api.datamodels._generated import (
     AssetEventDagRunReference,
@@ -705,6 +705,19 @@
 
     type: Literal["DeferTask"] = "DeferTask"
 
+    @field_serializer("trigger_kwargs", "next_kwargs", check_fields=True)
+    def _serde_kwarg_fields(self, val: str | dict[str, Any] | None, _info):
+        from airflow.serialization.serialized_objects import BaseSerialization
+
+        if not isinstance(val, dict):
+            # None, or an encrypted string
+            return val
+
+        if val.keys() == {"__type", "__var"}:
+            # Already encoded.
+            return val
+        return BaseSerialization.serialize(val or {})
+
 
 class RetryTask(TIRetryStatePayload):
     """Update a task instance state to up_for_retry."""
Index: task-sdk/src/airflow/sdk/api/datamodels/_generated.py
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
diff --git a/task-sdk/src/airflow/sdk/api/datamodels/_generated.py b/task-sdk/src/airflow/sdk/api/datamodels/_generated.py
--- a/task-sdk/src/airflow/sdk/api/datamodels/_generated.py	(revision c73862f948cae4890d8506782f2d1a6dc072fa44)
+++ b/task-sdk/src/airflow/sdk/api/datamodels/_generated.py	(date 1766584407198)
@@ -27,7 +27,7 @@
 
 from pydantic import AwareDatetime, BaseModel, ConfigDict, Field, JsonValue, RootModel
 
-API_VERSION: Final[str] = "2026-03-31"
+API_VERSION: Final[str] = "2025-12-08"
 
 
 class AssetAliasReferenceAssetEventDagRun(BaseModel):
@@ -181,10 +181,10 @@
     )
     state: Annotated[Literal["deferred"] | None, Field(title="State")] = "deferred"
     classpath: Annotated[str, Field(title="Classpath")]
-    trigger_kwargs: Annotated[dict[str, JsonValue] | str | None, Field(title="Trigger Kwargs")] = None
+    trigger_kwargs: Annotated[dict[str, Any] | str | None, Field(title="Trigger Kwargs")] = None
     trigger_timeout: Annotated[timedelta | None, Field(title="Trigger Timeout")] = None
     next_method: Annotated[str, Field(title="Next Method")]
-    next_kwargs: Annotated[dict[str, JsonValue] | None, Field(title="Next Kwargs")] = None
+    next_kwargs: Annotated[dict[str, Any] | None, Field(title="Next Kwargs")] = None
     rendered_map_index: Annotated[str | None, Field(title="Rendered Map Index")] = None

Once this was done, I am certain that the data that will be sent to the API server will now be BaseSerialisation encoded.

  1. Ran the DAG

  2. Checked the database:

Trigger table:

{"__var": {"moment": {"__var": 1766584500.193306, "__type": "datetime"}, "end_from_trigger": false}, "__type": "dict"}

next_kwargs in TI table:

{"__var": {"message": "hello from deferred task", "numbers": [1, 2, 3, 4, 5], "timestamp": {"__var": 1766584440.2096, "__type": "datetime"}, "numbers_in_tuple": {"__var": [6, 7, 8, 9, 10], "__type": "tuple"}}, "__type": "dict"}
  1. Task deferred fine
  2. Task resumed fine and worked fine too
image image image

Whats pending

  • Run some back compat tests (old worker, new API server)
  • Newsfragment needed?

^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named {pr_number}.significant.rst or {issue_number}.significant.rst, in airflow-core/newsfragments.

@boring-cyborg boring-cyborg bot added area:API Airflow's REST/HTTP API area:dev-tools area:task-sdk area:Triggerer backport-to-v3-1-test Mark PR with this label to backport to v3-1-test branch labels Dec 22, 2025
@amoghrajesh amoghrajesh self-assigned this Dec 23, 2025
@amoghrajesh amoghrajesh added this to the Airflow 3.2.0 milestone Dec 23, 2025
@amoghrajesh amoghrajesh marked this pull request as ready for review December 23, 2025 11:52
@amoghrajesh amoghrajesh requested a review from uranusjr December 29, 2025 09:28
@amoghrajesh amoghrajesh removed the backport-to-v3-1-test Mark PR with this label to backport to v3-1-test branch label Dec 29, 2025
@amoghrajesh
Copy link
Contributor Author

Ok, finally a green build!

Copy link
Member

@kaxil kaxil left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One comment about the upgrade path when there are some triggers already deffered which is worth verifying, rest lgtm.

@amoghrajesh amoghrajesh merged commit 05ac92f into apache:main Dec 30, 2025
126 checks passed
@amoghrajesh amoghrajesh deleted the use-serde-for-next-kwargs branch December 30, 2025 07:17
Subham-KRLX pushed a commit to Subham-KRLX/airflow that referenced this pull request Jan 2, 2026
stegololz pushed a commit to stegololz/airflow that referenced this pull request Jan 9, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants