Skip to content

Conversation

@amoghrajesh
Copy link
Contributor

For operator extra links, we shouldn't be pushing xcoms to the custom xcom backend even if configured. It has to be pushed into the metadata DB always.

This PR introduces a new private method that can do that and its called only from finalize in task runner.

Testing:

Configure an xcom backend like so:

from __future__ import annotations

import json
import os
from typing import Any, TypeVar

from airflow.sdk.execution_time.xcom import BaseXCom
from airflow.utils.json import XComDecoder, XComEncoder

T = TypeVar("T")


class JSONFileXComBackend(BaseXCom):
    FILE_PATH = "/tmp/airflow_xcom.json"

    @staticmethod
    def _read_xcom_file() -> dict:
        """Read the XCom JSON file."""
        if not os.path.exists(JSONFileXComBackend.FILE_PATH):
            return {}
        with open(JSONFileXComBackend.FILE_PATH, "r") as f:
            try:
                return json.load(f)
            except json.JSONDecodeError:
                return {}

    @staticmethod
    def _write_xcom_file(data: dict) -> None:
        """Write data to the XCom JSON file."""
        with open(JSONFileXComBackend.FILE_PATH, "a+") as f:
            json.dump(data, f, indent=4)

    @staticmethod
    def serialize_value(
        value: T,
        *,
        key: str | None = None,
        task_id: str | None = None,
        dag_id: str | None = None,
        run_id: str | None = None,
        map_index: int | None = None,
    ) -> str:
        # we will always serialize ourselves and not by BaseXCom as the deserialize method
        # from BaseXCom accepts only XCom objects and not the value directly
        s_val = json.dumps(value, cls=XComEncoder)
        s_val_encoded = s_val.encode("utf-8")

        base_path = JSONFileXComBackend.FILE_PATH
        with open(base_path, mode="wb") as f:
            f.write(s_val_encoded)
        return BaseXCom.serialize_value(base_path)

    @staticmethod
    def deserialize_value(result) -> Any:
        """
        Deserializes the value from the database or object storage.

        Compression is inferred from the file extension.
        """
        data = BaseXCom.deserialize_value(result)
        try:
            path = JSONFileXComBackend.FILE_PATH
        except (TypeError, ValueError):
            return data
        try:
            with open(path, mode="rb") as f:
                return json.load(f, cls=XComDecoder)
        except (TypeError, ValueError):
            return data


Run this with breeze to set it up:

export PYTHONPATH="/files:$PYTHONPATH"
export AIRFLOW__CORE__XCOM_BACKEND="my_xcom.JSONFileXComBackend"

Before changes:
image

After changes:
image


^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named {pr_number}.significant.rst or {issue_number}.significant.rst, in newsfragments.

@amoghrajesh amoghrajesh merged commit 6af34bb into apache:main Mar 25, 2025
61 checks passed
@amoghrajesh amoghrajesh deleted the xcomoperatorlink-push-to-db branch March 25, 2025 08:23
pankajkoti pushed a commit to astronomer/airflow that referenced this pull request Mar 28, 2025
shubham-pyc pushed a commit to shubham-pyc/airflow that referenced this pull request Apr 2, 2025
nailo2c pushed a commit to nailo2c/airflow that referenced this pull request Apr 4, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants