Skip to content

Conversation

@amoghrajesh
Copy link
Contributor

@amoghrajesh amoghrajesh commented Mar 4, 2025

closes: #45481

What?

XCom backends allow us to customise and store the xcoms to different backends, it is of use because:
The default XCom backend is the BaseXCom class, which stores XComs in the Airflow database. This is fine for small values, but can be problematic for large values, or for large numbers of XComs.

Docs: https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/xcoms.html#custom-xcom-backends

Important changes

Models XCOM

  1. The BaseXcom model was a big model that served the purpose of ORM, operations on the table, utilities for custom xcom backends and much more.
  2. This is not great, it makes it really hard to understand things as well as move things over to the task SDK because everything is so tightly coupled.
  3. Using this as an opportunity to do some things:
  • Extracting XComModel out of BaseXCom
  • XComModel will only have table related stuff and some utilities like: get_many, clear, set stays as its hard to remove for now.
  • BaseXCom is a class that will be moved to the task sdk under airflow.sdk.execution_time -- thought a bit, its not a definition, so maybe definitions isnt the right place for it. Although we can softly argue on this one

Execution Time XCOMs

  1. New module introduced to store the definition for BaseXcom and serve as building blocks for custom xcom backends
  2. This uses the new task sdk architecture and all the utilties defined here use the new execution API rather than direct DB access.
  3. For now, this has:

a) set - store an xcom value, for BaseXcom store the value in the DB and for xcom backends, store the path in the DB. This part handles it:


        value = cls.serialize_value(
            value=value,
            key=key,
            task_id=task_id,
            dag_id=dag_id,
            run_id=run_id,
            map_index=map_index,
        )

b) get_value - retrieve a xcom value, uses the new task SDK.

c) get_one - similar to what we had earlier in models. Uses the task sdk to get an xcom value, either from DB or from the custom xcom backend if configured. That part is handled here:

        if msg.value is not None:
            return cls.deserialize_value(msg.value)
        return None

d) serialize_value - for normal cases, we serialise the value we get from the DB for case of tables, and this is overriden by XCOM backends, so needn't have to worry about that.

e) deserialize_value - similar to the above case.

f) purge - used to purge an xcom entry, mainly for custom xcom backends. Empty for ORM

g) delete - used to delete an xcom. Added a new utility, will explain below why this was added.

  1. resolve_xcom_backend is used to resolve a custom XCom class if provided in the conf or just return the BaseXcom class, exactly ditto as earlier behaviour.

Execution Time changes

  1. Comms has a new DeleteXCom introduced
  2. Supervisor uses DeleteXCom when called
  3. Task runner
  • The xcom_pull and xcom_push now use the utilities in execution_time/xcom.py
  • We now get the xcom keys to clear when we mark a task first as running and we individually delete those instead of doing it in the execution API server due to couple issues:
  • If /run endpoint was called mutliple times, we would end up deleting some xcoms that we shouldn';t or we'd fail

Execution API server changes

Task Instances

  1. The ti_run endpoint now sends xcom_keys_to_clear which is basically:
query = session.query(XComModel.key).filter_by(
                dag_id=ti.dag_id, task_id=ti.task_id, run_id=ti.run_id
            )
            if map_index is not None:
                query = query.filter_by(map_index=map_index)

            xcom_keys = session.execute(query)
  1. This is used by the task runner to send delete requests to clear these xcoms.

Xcoms

  1. get_xcom, set_xcom now directly query the DB instead of using the xcom.set or xcom.get_many utilities.
  2. New endpoint to delete xcoms which will be called by the task runner.

Core api changes

  1. The Xcom apis in core_api have been modified to make up for the chanegs to the BaseXcom model.
  2. Mostly residual changes to manage the changes to the models.

Other changes

  1. Any other changes are mostly residual -- tests, devel-common, etc to adjust / make up for the changes to the models
  2. If anything is unexpected, please shout.

Testing

Situation 1: Using the database for xcoms (no xcom backend)

Using DAG:

from airflow import DAG
from airflow.providers.standard.operators.python import PythonOperator

def push_to_xcom(**kwargs):
    value = ["This is a long message"] * 10
    return value
#
# def push_to_xcom2(**kwargs):
#     value = ["Hello, XCom2!"]
#     return value

def pull_from_xcom(**kwargs):
    ti = kwargs['ti']
    xcom_value1 = ti.xcom_pull(task_ids=["push_xcom_task"])

    print("Pulled value", xcom_value1)

    topush = xcom_value1 + ["modified"] * 10
    print("Pushing value", topush)

    # xcom_value2 = ti.xcom_pull(task_ids=["push_xcom_task2"])
    return topush

with DAG(
    'xcom_example',
    schedule=None,
    catchup=False,
) as dag:

    push_xcom_task = PythonOperator(
        task_id='push_xcom_task',
        python_callable=push_to_xcom,
    )

    # push_xcom_task2 = PythonOperator(
    #     task_id='push_xcom_task2',
    #     python_callable=push_to_xcom2,
    # )

    pull_xcom_task = PythonOperator(
        task_id='pull_xcom_task',
        python_callable=pull_from_xcom,
    )

    push_xcom_task >> pull_xcom_task

Success:
image

Xcom pushed by task 1:

image

Task 1 logs:
image

Sends the SetXcom call:

[2025-03-10, 11:21:06] DEBUG - Sending request json="{\"key\":\"return_value\",\"value\":[\"This is a long message\",\"This is a long message\",\"This is a long message\",\"This is a long message\",\"This is a long message\",\"This is a long message\",\"This is a long message\",\"This is a long message\",\"This is a long message\",\"This is a long message\"],\"dag_id\":\"xcom_example\",\"run_id\":\"manual__2025-03-10T11:21:05.415784+00:00_uE7ibsBB\",\"task_id\":\"push_xcom_task\",\"map_index\":-1,\"mapped_length\":null,\"type\":\"SetXCom\"}\n" source="task"

Xcom pushed by task 2:

image

Task 2 logs:

image

Sends the GetXcom call:

Sending request json="{\"key\":\"return_value\",\"dag_id\":\"xcom_example\",\"run_id\":\"manual__2025-03-10T11:21:05.415784+00:00_uE7ibsBB\",\"task_id\":\"push_xcom_task\",\"map_index\":-1,\"type\":\"GetXCom\"}\n" source="task"

Status of the table:

1,push_xcom_task,-1,return_value,xcom_example,manual__2025-03-10T11:21:05.415784+00:00_uE7ibsBB,"[""This is a long message"", ""This is a long message"", ""This is a long message"", ""This is a long message"", ""This is a long message"", ""This is a long message"", ""This is a long message"", ""This is a long message"", ""This is a long message"", ""This is a long message""]",2025-03-10 11:21:06.671134 +00:00
1,pull_xcom_task,-1,return_value,xcom_example,manual__2025-03-10T11:21:05.415784+00:00_uE7ibsBB,"[""This is a long message"", ""This is a long message"", ""This is a long message"", ""This is a long message"", ""This is a long message"", ""This is a long message"", ""This is a long message"", ""This is a long message"", ""This is a long message"", ""This is a long message"", ""modified"", ""modified"", ""modified"", ""modified"", ""modified"", ""modified"", ""modified"", ""modified"", ""modified"", ""modified""]",2025-03-10 11:21:07.493851 +00:00

Observe that the data has been stored in the table in a native python object manner (json complaint) and actual data is stored and not a reference of it.

Situation 2: Using a custom xcom backend.

Using DAG:

from airflow import DAG
from airflow.providers.standard.operators.python import PythonOperator

def push_to_xcom(**kwargs):
    value = ["This is a long message"] * 10
    return value
#
# def push_to_xcom2(**kwargs):
#     value = ["Hello, XCom2!"]
#     return value

def pull_from_xcom(**kwargs):
    ti = kwargs['ti']
    xcom_value1 = ti.xcom_pull(task_ids=["push_xcom_task"])

    print("Pulled value", xcom_value1)

    topush = xcom_value1 + ["modified"] * 10
    print("Pushing value", topush)

    # xcom_value2 = ti.xcom_pull(task_ids=["push_xcom_task2"])
    return topush

with DAG(
    'xcom_example',
    schedule=None,
    catchup=False,
) as dag:

    push_xcom_task = PythonOperator(
        task_id='push_xcom_task',
        python_callable=push_to_xcom,
    )

    # push_xcom_task2 = PythonOperator(
    #     task_id='push_xcom_task2',
    #     python_callable=push_to_xcom2,
    # )

    pull_xcom_task = PythonOperator(
        task_id='pull_xcom_task',
        python_callable=pull_from_xcom,
    )

    push_xcom_task >> pull_xcom_task

Setup

  1. Wrote a custom xcom backend that works by storing xcoms in a JSON format in the worker file system:
from __future__ import annotations

import json
import os
from typing import Any, TypeVar

from airflow.sdk.execution_time.xcom import BaseXCom
from airflow.utils.json import XComDecoder, XComEncoder

T = TypeVar("T")


class JSONFileXComBackend(BaseXCom):
    FILE_PATH = "/tmp/airflow_xcom.json"

    @staticmethod
    def _read_xcom_file() -> dict:
        """Read the XCom JSON file."""
        if not os.path.exists(JSONFileXComBackend.FILE_PATH):
            return {}
        with open(JSONFileXComBackend.FILE_PATH, "r") as f:
            try:
                return json.load(f)
            except json.JSONDecodeError:
                return {}

    @staticmethod
    def _write_xcom_file(data: dict) -> None:
        """Write data to the XCom JSON file."""
        with open(JSONFileXComBackend.FILE_PATH, "a+") as f:
            json.dump(data, f, indent=4)

    @staticmethod
    def serialize_value(
        value: T,
        *,
        key: str | None = None,
        task_id: str | None = None,
        dag_id: str | None = None,
        run_id: str | None = None,
        map_index: int | None = None,
    ) -> str:
        # we will always serialize ourselves and not by BaseXCom as the deserialize method
        # from BaseXCom accepts only XCom objects and not the value directly
        s_val = json.dumps(value, cls=XComEncoder)
        s_val_encoded = s_val.encode("utf-8")

        base_path = JSONFileXComBackend.FILE_PATH
        with open(base_path, mode="wb") as f:
            f.write(s_val_encoded)
        return BaseXCom.serialize_value(base_path)

    @staticmethod
    def deserialize_value(result) -> Any:
        """
        Deserializes the value from the database or object storage.

        Compression is inferred from the file extension.
        """
        data = BaseXCom.deserialize_value(result)
        try:
            path = JSONFileXComBackend.FILE_PATH
        except (TypeError, ValueError):
            return data
        try:
            with open(path, mode="rb") as f:
                return json.load(f, cls=XComDecoder)
        except (TypeError, ValueError):
            return data


  1. Set up breeze to use this backend (set up a init.sh)
export AIRFLOW__CORE__XCOM_BACKEND="my_xcom.JSONFileXComBackend"
  1. Launch breeze start-airflow normally

Run the dag normally

Task 1

image

XCOm pushed from task 1:
image

Logs:

image

Logs showing what was actually pushed:

[2025-03-10, 11:31:03] INFO - Done. Returned value was: ['This is a long message', 'This is a long message', 'This is a long message', 'This is a long message', 'This is a long message', 'This is a long message', 'This is a long message', 'This is a long message', 'This is a long message', 'This is a long message'] source="airflow.task.operators.airflow.providers.standard.operators.python.PythonOperator"

The SetXcom call:

[2025-03-10, 11:31:03] DEBUG - Sending request json="{\"key\":\"return_value\",\"value\":\"/tmp/airflow_xcom.json\",\"dag_id\":\"xcom_example\",\"run_id\":\"manual__2025-03-10T11:31:01.832127+00:00_IgXcmb0b\",\"task_id\":\"push_xcom_task\",\"map_index\":-1,\"mapped_length\":null,\"type\":\"SetXCom\"}\n" source="task"

Task 2:
image

Xcom pushed by task 2:

image

Important logs from task 2:

2025-03-10, 11:31:04] INFO - Pushing xcom ti="RuntimeTaskInstance(id=UUID('01957fd2-9452-757c-8488-1bf67ddfa350'), task_id='pull_xcom_task', dag_id='xcom_example', run_id='manual__2025-03-10T11:31:01.832127+00:00_IgXcmb0b', try_number=1, map_index=-1, hostname='0ce659e47888', task=<Task(PythonOperator): pull_xcom_task>, bundle_instance=LocalDagBundle(name=dags-folder), max_tries=0, start_date=datetime.datetime(2025, 3, 10, 11, 31, 3, 845298, tzinfo=TzInfo(UTC)))" source="task"
[2025-03-10, 11:31:04] INFO - Pulled value ['This is a long message', 'This is a long message', 'This is a long message', 'This is a long message', 'This is a long message', 'This is a long message', 'This is a long message', 'This is a long message', 'This is a long message', 'This is a long message'] chan="stdout" source="task"
[2025-03-10, 11:31:04] DEBUG - Sending request json="{\"key\":\"return_value\",\"value\":\"/tmp/airflow_xcom.json\",\"dag_id\":\"xcom_example\",\"run_id\":\"manual__2025-03-10T11:31:01.832127+00:00_IgXcmb0b\",\"task_id\":\"pull_xcom_task\",\"map_index\":-1,\"mapped_length\":null,\"type\":\"SetXCom\"}\n" source="task"
[2025-03-10, 11:31:04] INFO - Pushing value ['This is a long message', 'This is a long message', 'This is a long message', 'This is a long message', 'This is a long message', 'This is a long message', 'This is a long message', 'This is a long message', 'This is a long message', 'This is a long message', 'modified', 'modified', 'modified', 'modified', 'modified', 'modified', 'modified', 'modified', 'modified', 'modified'] chan="stdout" source="task"

Inside the worker, check for the log json file:

oot@0ce659e47888:/opt/airflow# cat /tmp/airflow_xcom.json
["This is a long message", "This is a long message", "This is a long message", "This is a long message", "This is a long message", "This is a long message", "This is a long message", "This is a long message", "This is a long message", "This is a long message", "modified", "modified", "modified", "modified", "modified", "modified", "modified", "modified", "modified", "modified"]root@0ce659e47888:/opt/airflow#

Status of the table:

1,push_xcom_task,-1,return_value,xcom_example,manual__2025-03-10T11:31:01.832127+00:00_IgXcmb0b,"""/tmp/airflow_xcom.json""",2025-03-10 11:31:03.391402 +00:00
1,pull_xcom_task,-1,return_value,xcom_example,manual__2025-03-10T11:31:01.832127+00:00_IgXcmb0b,"""/tmp/airflow_xcom.json""",2025-03-10 11:31:04.431473 +00:00

Observe that the path is stored.

TODO / Whats next:


^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named {pr_number}.significant.rst or {issue_number}.significant.rst, in newsfragments.

@amoghrajesh amoghrajesh requested a review from ashb March 4, 2025 08:19
Copy link
Member

@ashb ashb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure I love the place this is implemented, I wonder if it's time for an execution_time/xcom.py file

@amoghrajesh amoghrajesh self-assigned this Mar 10, 2025
@amoghrajesh amoghrajesh marked this pull request as ready for review March 10, 2025 11:35
@amoghrajesh amoghrajesh requested review from ashb and removed request for bolkedebruin March 10, 2025 11:40
@amoghrajesh
Copy link
Contributor Author

Did a final round of regression tests with a few example dags both with Xcom DB backend and custom XCom backend (described in PR desc)

DAG1:

from airflow import DAG
from airflow.providers.standard.operators.python import PythonOperator

def push_to_xcom(**kwargs):
    value = ["This is a long message"] * 10
    # value = ("key", "value")
    return value
#
# def push_to_xcom2(**kwargs):
#     value = ["Hello, XCom2!"]
#     return value

def pull_from_xcom(**kwargs):
    ti = kwargs['ti']
    xcom_value1 = ti.xcom_pull(task_ids=["push_xcom_task"])

    print("Pulled value", xcom_value1)

    topush = xcom_value1 + ["modified"] * 10
    print("Pushing value", topush)

    # xcom_value2 = ti.xcom_pull(task_ids=["push_xcom_task2"])
    return topush

with DAG(
    'xcom_example',
    schedule=None,
    catchup=False,
) as dag:

    push_xcom_task = PythonOperator(
        task_id='push_xcom_task',
        python_callable=push_to_xcom,
    )

    # push_xcom_task2 = PythonOperator(
    #     task_id='push_xcom_task2',
    #     python_callable=push_to_xcom2,
    # )

    pull_xcom_task = PythonOperator(
        task_id='pull_xcom_task',
        python_callable=pull_from_xcom,
    )

    push_xcom_task >> pull_xcom_task

DAG2:

from airflow import DAG
from airflow.providers.standard.operators.python import PythonOperator

def push_to_xcom(**kwargs):
    value = ("Hello", "XCom!")
    return value

with DAG(
    'xcom_tuple_return',
    schedule=None,
    catchup=False,
) as dag:

    push_xcom_task = PythonOperator(
        task_id='tuple_task',
        python_callable=push_to_xcom,
    )

DAG3:

from airflow import DAG
from airflow.providers.standard.operators.python import PythonOperator
from datetime import datetime

# Task to push an XCom
def push_xcom(**kwargs):
    kwargs['ti'].xcom_push(key="example_key", value="example_value")

# Task to pull an XCom
def pull_xcom(**kwargs):
    pulled_value = kwargs['ti'].xcom_pull(task_ids="push_task", key="example_key")
    print(f"Pulled XCom Value: {pulled_value}")

# Define the DAG
with DAG(
    dag_id="simple_xcom_pull_example",
    default_args={"owner": "airflow"},
    schedule=None,
    start_date=datetime(2023, 1, 1),
    catchup=False,
) as dag:

    # Task 1: Push XCom
    push_task = PythonOperator(
        task_id="push_task",
        python_callable=push_xcom,
    )

    # Task 2: Pull XCom
    pull_task = PythonOperator(
        task_id="pull_task",
        python_callable=pull_xcom,
    )

    # Define task dependencies
    push_task >> pull_task

DAG4:

"""Example DAG demonstrating the usage of XComs."""
from airflow import DAG
from airflow.providers.standard.operators.python import PythonOperator
from datetime import datetime, timedelta

dag = DAG(
        "example_xcom",
    start_date=datetime(2023, 11, 28),
    default_args={"owner": "airflow"},
    schedule="@daily",
    catchup=False,
    tags=["core"],
)

value_1 = [1, 2, 3]
value_2 = {"a": "b"}


def push(**kwargs):
    """Pushes an XCom without a specific target"""
    kwargs["ti"].xcom_push(key="value from pusher 1", value=value_1)


def push_by_returning(**kwargs):
    """Pushes an XCom without a specific target, just by returning it"""
    return value_2


def puller(**kwargs):
    """Pull all previously pushed XComs and check if the pushed values match the pulled values."""
    ti = kwargs["ti"]

    # get value_1
    pulled_value_1 = ti.xcom_pull(key=None, task_ids="push")
    if pulled_value_1 != value_1:
        raise ValueError(f"The two values differ {pulled_value_1} and {value_1}")

    # get value_2
    pulled_value_2 = ti.xcom_pull(task_ids="push_by_returning")
    if pulled_value_2 != value_2:
        raise ValueError(f"The two values differ {pulled_value_2} and {value_2}")

    # get both value_1 and value_2
    pulled_value_1, pulled_value_2 = ti.xcom_pull(
        key=None, task_ids=["push", "push_by_returning"]
    )
    print(f"pulled_value_1 is {pulled_value_1}")
    print(f"pulled_value_2 is {pulled_value_2}")
    if pulled_value_1 != value_1:
        raise ValueError(f"The two values differ {pulled_value_1} and {value_1}")
    if pulled_value_2 != value_2:
        raise ValueError(f"The two values differ {pulled_value_2} and {value_2}")


push1 = PythonOperator(
    task_id="push",
    dag=dag,
    python_callable=push,
    depends_on_past=True,
)

push2 = PythonOperator(
    task_id="push_by_returning",
    dag=dag,
    python_callable=push_by_returning,
)

pull = PythonOperator(
    task_id="puller",
    dag=dag,
    python_callable=puller,
)

pull << [push1, push2]

These works as expected, both with the custom xcom backend as well as with the XCom db backend.

@amoghrajesh
Copy link
Contributor Author

Huh, finally got a green run here. I am going to merge this since it has already gone too huge, will take up any issues in follow ups.

@amoghrajesh
Copy link
Contributor Author

Thanks @ashb and @pierrejeambrun for the review.

@amoghrajesh amoghrajesh merged commit 91b0f61 into apache:main Mar 17, 2025
89 checks passed
@amoghrajesh amoghrajesh deleted the AIP-72-custom-xcom-backend branch March 17, 2025 15:19
@eladkal
Copy link
Contributor

eladkal commented Mar 18, 2025

You mentioned:

Situation 1: Using the database for xcoms (no xcom backend)
Situation 2: Using a custom xcom backend.

There is also situation 3 that mix both using AIRFLOW__COMMON_IO__XCOM_OBJECTSTORAGE_THRESHOLD
https://airflow.apache.org/docs/apache-airflow-providers-common-io/stable/configurations-ref.html#xcom-objectstorage-threshold

Are we covered in this case as well?

@amoghrajesh
Copy link
Contributor Author

@eladkal yep that case is covered as well as that logic to handle threshold is handled within the custom xcom backend. No part of that has been tweaked at all

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Handle Custom XCom Backend on Task SDK

5 participants