Skip to content

Add back support to be able to return tuples in xcoms #47195

@amoghrajesh

Description

@amoghrajesh

Body

As part of AIP-72: Simplify the XCOM interface between task sdk and execution API #46719, we simplifies the interactions between the task runner and the task execution API to interact in JSON supported objects and not serialised json strings.

This leads into a side effect -- tuples arent handled well.

The kind of error we run into (this is in the task execution api server)

│                                                                                                  │
│ /opt/airflow/airflow/api_fastapi/execution_api/routes/xcoms.py:228 in set_xcom                   │
│                                                                                                  │
│   225 │                                                                                          │
│   226 │   # We use `BaseXCom.set` to set XComs directly to the database, bypassing the XCom Ba   │
│   227 │   try:                                                                                   │
│ ❱ 228 │   │   BaseXCom.set(                                                                      │
│   229 │   │   │   key=key,                                                                       │
│   230 │   │   │   value=value,                                                                   │
│   231 │   │   │   dag_id=dag_id,                                                                 │
│                                                                                                  │
│ ╭─────────────────────────────────────── locals ───────────────────────────────────────╮         │
│ │          conf = <airflow.configuration.AirflowConfigParser object at 0xffff8a1c9580> │         │
│ │        dag_id = 'xcom_tuple_return'                                                  │         │
│ │           key = 'return_value'                                                       │         │
│ │     map_index = -1                                                                   │         │
│ │ mapped_length = None                                                                 │         │
│ │        run_id = 'manual__2025-02-28T06:52:36.257667+00:00_AI3PEQTq'                  │         │
│ │       session = <sqlalchemy.orm.session.Session object at 0xffff59d667f0>            │         │
│ │       task_id = 'tuple_task'                                                         │         │
│ │         token = TIToken(ti_key='test_key')                                           │         │
│ │         value = {                                                                    │         │
│ │                 │   '__classname__': 'builtins.tuple',                               │         │
│ │                 │   '__version__': 1,                                                │         │
│ │                 │   '__data__': ['Hello', 'XCom!']                                   │         │
│ │                 }                                                                    │         │
│ ╰──────────────────────────────────────────────────────────────────────────────────────╯         │
│                                                                                                  │
│ /opt/airflow/airflow/utils/session.py:98 in wrapper                                              │
│                                                                                                  │
│    95 │   @wraps(func)                                                                           │
│    96 │   def wrapper(*args, **kwargs) -> RT:                                                    │
│    97 │   │   if "session" in kwargs or session_args_idx < len(args):                            │
│ ❱  98 │   │   │   return func(*args, **kwargs)                                                   │
│    99 │   │   else:                                                                              │
│   100 │   │   │   with create_session() as session:                                              │
│   101 │   │   │   │   return func(*args, session=session, **kwargs)                              │
│                                                                                                  │
│ ╭─────────────────────────────────────────── locals ───────────────────────────────────────────╮ │
│ │             args = (<class 'airflow.models.xcom.BaseXCom'>,)                                 │ │
│ │           kwargs = {                                                                         │ │
│ │                    │   'key': 'return_value',                                                │ │
│ │                    │   'value': {                                                            │ │
│ │                    │   │   '__classname__': 'builtins.tuple',                                │ │
│ │                    │   │   '__version__': 1,                                                 │ │
│ │                    │   │   '__data__': ['Hello', 'XCom!']                                    │ │
│ │                    │   },                                                                    │ │
│ │                    │   'dag_id': 'xcom_tuple_return',                                        │ │
│ │                    │   'task_id': 'tuple_task',                                              │ │
│ │                    │   'run_id': 'manual__2025-02-28T06:52:36.257667+00:00_AI3PEQTq',        │ │
│ │                    │   'session': <sqlalchemy.orm.session.Session object at 0xffff59d667f0>, │ │
│ │                    │   'map_index': -1                                                       │ │
│ │                    }                                                                         │ │
│ │ session_args_idx = 7                                                                         │ │
│ ╰──────────────────────────────────────────────────────────────────────────────────────────────╯ │
│                                                                                                  │
│ /opt/airflow/airflow/models/xcom.py:185 in set                                                   │
│                                                                                                  │
│   182 │   │   │   )                                                                              │
│   183 │   │   │   value = list(value)                                                            │
│   184 │   │                                                                                      │
│ ❱ 185 │   │   value = cls.serialize_value(                                                       │
│   186 │   │   │   value=value,                                                                   │
│   187 │   │   │   key=key,                                                                       │
│   188 │   │   │   task_id=task_id,                                                               │
│                                                                                                  │
│ ╭──────────────────────────────── locals ────────────────────────────────╮                       │
│ │     dag_id = 'xcom_tuple_return'                                       │                       │
│ │ dag_run_id = 1                                                         │                       │
│ │        key = 'return_value'                                            │                       │
│ │  map_index = -1                                                        │                       │
│ │     run_id = 'manual__2025-02-28T06:52:36.257667+00:00_AI3PEQTq'       │                       │
│ │    session = <sqlalchemy.orm.session.Session object at 0xffff59d667f0> │                       │
│ │    task_id = 'tuple_task'                                              │                       │
│ │      value = {                                                         │                       │
│ │              │   '__classname__': 'builtins.tuple',                    │                       │
│ │              │   '__version__': 1,                                     │                       │
│ │              │   '__data__': ['Hello', 'XCom!']                        │                       │
│ │              }                                                         │                       │
│ ╰────────────────────────────────────────────────────────────────────────╯                       │
│                                                                                                  │
│ /opt/airflow/airflow/models/xcom.py:452 in serialize_value                                       │
│                                                                                                  │
│   449 │   ) -> str:                                                                              │
│   450 │   │   """Serialize XCom value to JSON str."""                                            │
│   451 │   │   try:                                                                               │
│ ❱ 452 │   │   │   return json.dumps(value, cls=XComEncoder)                                      │
│   453 │   │   except (ValueError, TypeError):                                                    │
│   454 │   │   │   raise ValueError("XCom value must be JSON serializable")                       │
│   455                                                                                            │
│                                                                                                  │
│ ╭──────────────────────────── locals ─────────────────────────────╮                              │
│ │    dag_id = 'xcom_tuple_return'                                 │                              │
│ │       key = 'return_value'                                      │                              │
│ │ map_index = -1                                                  │                              │
│ │    run_id = 'manual__2025-02-28T06:52:36.257667+00:00_AI3PEQTq' │                              │
│ │   task_id = 'tuple_task'                                        │                              │
│ │     value = {                                                   │                              │
│ │             │   '__classname__': 'builtins.tuple',              │                              │
│ │             │   '__version__': 1,                               │                              │
│ │             │   '__data__': ['Hello', 'XCom!']                  │                              │
│ │             }                                                   │                              │
│ ╰─────────────────────────────────────────────────────────────────╯                              │
│                                                                                                  │
│ /usr/local/lib/python3.9/json/__init__.py:234 in dumps                                           │
│                                                                                                  │
│   231 │   │   return _default_encoder.encode(obj)                                                │
│   232 │   if cls is None:                                                                        │
│   233 │   │   cls = JSONEncoder                                                                  │
│ ❱ 234 │   return cls(                                                                            │
│   235 │   │   skipkeys=skipkeys, ensure_ascii=ensure_ascii,                                      │
│   236 │   │   check_circular=check_circular, allow_nan=allow_nan, indent=indent,                 │
│   237 │   │   separators=separators, default=default, sort_keys=sort_keys,                       │
│                                                                                                  │
│ ╭──────────────────────── locals ─────────────────────────╮                                      │
│ │      allow_nan = True                                   │                                      │
│ │ check_circular = True                                   │                                      │
│ │        default = None                                   │                                      │
│ │   ensure_ascii = True                                   │                                      │
│ │         indent = None                                   │                                      │
│ │             kw = {}                                     │                                      │
│ │            obj = {                                      │                                      │
│ │                  │   '__classname__': 'builtins.tuple', │                                      │
│ │                  │   '__version__': 1,                  │                                      │
│ │                  │   '__data__': ['Hello', 'XCom!']     │                                      │
│ │                  }                                      │                                      │
│ │     separators = None                                   │                                      │
│ │       skipkeys = False                                  │                                      │
│ │      sort_keys = False                                  │                                      │
│ ╰─────────────────────────────────────────────────────────╯                                      │
│                                                                                                  │
│ /opt/airflow/airflow/utils/json.py:99 in encode                                                  │
│                                                                                                  │
│    96 │   def encode(self, o: Any) -> str:                                                       │
│    97 │   │   # checked here and in serialize                                                    │
│    98 │   │   if isinstance(o, dict) and (CLASSNAME in o or SCHEMA_ID in o):                     │
│ ❱  99 │   │   │   raise AttributeError(f"reserved key {CLASSNAME} found in dict to serialize")   │
│   100 │   │                                                                                      │
│   101 │   │   # tuples are not preserved by std python serializer                                │
│   102 │   │   if isinstance(o, tuple):                                                           │
│                                                                                                  │
│ ╭─────────────────────────────────────────── locals ───────────────────────────────────────────╮ │
│ │    o = {'__classname__': 'builtins.tuple', '__version__': 1, '__data__': ['Hello', 'XCom!']} │ │
│ │ self = <airflow.utils.json.XComEncoder object at 0xffff599de6a0>                             │ │
│ ╰──────────────────────────────────────────────────────────────────────────────────────────────╯ │
╰──────────────────────────────────────────────────────────────────────────────────────────────────╯
AttributeError: reserved key __classname__ found in dict to serialize

To repro, use any simple dag that returns a tuple xcom. Example:

from airflow import DAG
from airflow.providers.standard.operators.python import PythonOperator

def push_to_xcom(**kwargs):
    value = ("Hello", "XCom!")
    return value

with DAG(
    'xcom_tuple_return',
    schedule=None,
    catchup=False,
) as dag:

    push_xcom_task = PythonOperator(
        task_id='tuple_task',
        python_callable=push_to_xcom,
    )

Committer

  • I acknowledge that I am a maintainer/committer of the Apache Airflow project.

Metadata

Metadata

Assignees

Labels

area:task-execution-interface-aip72AIP-72: Task Execution Interface (TEI) aka Task SDKarea:task-sdkkind:bugThis is a clearly a bugpriority:highHigh priority bug that should be patched quickly but does not require immediate new release

Type

No type

Projects

Status

Done

Milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions