Skip to content

Conversation

@amoghrajesh
Copy link
Contributor

@amoghrajesh amoghrajesh commented Feb 13, 2025

closes: #46513
related: #45231

Current issues

  • While setting xcoms, there were multiple layers of serialisation happening / deserialisation not happening at the right place leading an issue where we end up storing a double/triple encoded json string for xcom value in the xcom table, example here: Issues while reading Xcom values #46513

And here:
looks like this: (first 2 are AF2 and last 2 are with task sdk)
image (20)

  • This decision was taken in regards to keeping the interface to the api server simple -> interact only in json strings. Native language ser / deser will be taken care by respective sdks.
  • This leads to an overly complex xcom serialise + deserialise chaos at the client too.

Current flow for setting an xcom from task sdk:

  1. Task runner serialises and sends the value for xcom: https://github.com/apache/airflow/blob/main/task_sdk/src/airflow/sdk/execution_time/task_runner.py#L353 (Looking like: '"Hello, XCom!"')
  2. In the API server at the moment, we directly write to the DB (BaseXcom). Due to it being a JSON field, it serialises it again which was leading to double serialisation.
  3. So, we end up storing a multi serialised value for xcom.

Current flow for getting an xcom:

  1. The task runner in task sdk requests for an xcom
  2. The client calls the API server, the api server deserialises and sends the xcom value. It sends value from the DB, which is a multi serialised value.
  3. The task runner gets the XCOM from the client and performs a deserialise on it: https://github.com/apache/airflow/blob/main/task_sdk/src/airflow/sdk/execution_time/task_runner.py#L321

Proposal to simplify

  • JSON is a globally accepted protocol for client-server communication which should be accepted across various languages, frameworks etc.
  • Due to this, we can enable our execution API server to send and receive JSON and in fact make it simpler to interact with.
  • To do this, we let the execution API server deal with JSON compliant types (json values like dict, list, str, int etc), and not json strings which would not make it too easy to interact with. For example:

The new API response for get xcom now looks like:

{
    "key": "return_value",
    "value": {
        "Hello": "XCom2!"
    }
}

contrary to:

{
    "key": "return_value",
    "value": "{\"Hello\": \"XCom2!\"}"
}

  • So now the responsibility of sending and receiving JSON lies on the client at play. In our case the task sdk. It will send out JSON compliant types to the execution API and receive JSON compliant types from the api too.
  • If the client wants to handle special encoding for any of the non json compliant types (custom defined or native to a language), those have to be handled in the client itself.

Impact on custom XCOM backends.

Flow:

  1. When performing a set, the custom xcom backend stores the data to object store and stores the path in the database, after serialising it: https://github.com/apache/airflow/blob/main/airflow/models/xcom.py#L185-L192
  2. When performing a get_value, the custom xcom backend does this:https://github.com/apache/airflow/blob/main/providers/common/io/src/airflow/providers/common/io/xcom/backend.py#L165 which means it returns raw data and not serialised data.
  3. TLDR: Custom XCOM backends store the "path" to the xcom object remotely stored in the DB as a JSON type, but store the data at that "path" without serialisation (as JSON objects). So the interface to interact with this backend is in JSON objects, which is what the new proposed flow above does precisely.
  4. There should be no impact henceforth to the custom xcom backend.

Testing:

DAG used:

from airflow import DAG
from airflow.providers.standard.operators.python import PythonOperator

def push_to_xcom(**kwargs):
    value = ["Hello, XCom!"]
    return value

def push_to_xcom2(**kwargs):
    value = ["Hello, XCom2!"]
    return value

def pull_from_xcom(**kwargs):
    ti = kwargs['ti']
    xcom_value1 = ti.xcom_pull(task_ids=["push_xcom_task"])
    xcom_value2 = ti.xcom_pull(task_ids=["push_xcom_task2"])
    return xcom_value1 + xcom_value2

with DAG(
    'xcom_example',
    schedule=None,
    catchup=False,
) as dag:

    push_xcom_task = PythonOperator(
        task_id='push_xcom_task',
        python_callable=push_to_xcom,
    )

    push_xcom_task2 = PythonOperator(
        task_id='push_xcom_task2',
        python_callable=push_to_xcom2,
    )

    pull_xcom_task = PythonOperator(
        task_id='pull_xcom_task',
        python_callable=pull_from_xcom,
    )

    push_xcom_task >> push_xcom_task2 >> pull_xcom_task

This tests both push and pull behaviour.

task 1
image

task 2
image

task 3 (puller)
image

DB state (single serialisation only):
image


^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named {pr_number}.significant.rst or {issue_number}.significant.rst, in newsfragments.

@amoghrajesh
Copy link
Contributor Author

amoghrajesh commented Feb 13, 2025

Yeah, i think i have to fix the execution API tests

@amoghrajesh
Copy link
Contributor Author

Oh some task sdk tests will fail now, will take a look!

@amoghrajesh amoghrajesh changed the title AIP-72: Do not doubly/triply serialize while setting xcoms AIP-72: Simplify the interface for XCOM endpoints between task sdk and execution API Feb 14, 2025
@amoghrajesh amoghrajesh changed the title AIP-72: Simplify the interface for XCOM endpoints between task sdk and execution API AIP-72: Simplify the XCOM interface between task sdk and execution API Feb 14, 2025
@amoghrajesh amoghrajesh requested a review from ashb February 14, 2025 06:21
Copy link
Member

@kaxil kaxil left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

few comments -- won't be looking until I am back -- but reviewing it for some context :)

Copy link
Member

@kaxil kaxil left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fyi: We would still have multiple serialization/ de-serialization (after eliminating 1 step in this PR) steps that we should consolidate -- even after this PR.

Storing Data (for GO-client example):

  1. Go serializes the object (json.Marshal) → Sends it as an HTTP request.
  2. FastAPI deserializes it (JsonValue) → Converts it into a Python dict.
  3. SQLAlchemy stores it (Column(JSON)) → Serializes it back into JSON in the database.

value = Column(JSON().with_variant(postgresql.JSONB, "postgresql"))

Retrieving Data:

  1. SQLAlchemy loads it as a Python dict from the DB.
  2. FastAPI serializes it back into JSON.
  3. Go receives it and deserializes it (json.Unmarshal or a different library).

@kaxil
Copy link
Member

kaxil commented Feb 14, 2025

Impact on custom XCOM backends.
Flow:

TLDR: Custom XCOM backends store the "path" to the xcom object remotely stored in the DB as a JSON type, but store the data at that "path" without serialisation (as JSON objects). So the interface to interact with this backend is in JSON objects, which is what the new proposed flow above does precisely.

Not always, example XComObjectStorageBackend only stores value in a remote object if it is greater than a certain threshold, for others it serializes the objects into string using json.dumps

# we will always serialize ourselves and not by BaseXCom as the deserialize method
# from BaseXCom accepts only XCom objects and not the value directly
s_val = json.dumps(value, cls=XComEncoder)

threshold = _get_threshold()
if threshold < 0 or len(s_val_encoded) < threshold: # Either no threshold or value is small enough.
if AIRFLOW_V_3_0_PLUS:
return s_val
else:
# TODO: Remove this branch once we drop support for Airflow 2
# This is for Airflow 2.10 where the value is expected to be bytes
return s_val_encoded

Copy link
Member

@ashb ashb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@amoghrajesh
Copy link
Contributor Author

amoghrajesh commented Feb 15, 2025

Impact on custom XCOM backends.
Flow:

TLDR: Custom XCOM backends store the "path" to the xcom object remotely stored in the DB as a JSON type, but store the data at that "path" without serialisation (as JSON objects). So the interface to interact with this backend is in JSON objects, which is what the new proposed flow above does precisely.

Not always, example XComObjectStorageBackend only stores value in a remote object if it is greater than a certain threshold, for others it serializes the objects into string using json.dumps

# we will always serialize ourselves and not by BaseXCom as the deserialize method
# from BaseXCom accepts only XCom objects and not the value directly
s_val = json.dumps(value, cls=XComEncoder)

threshold = _get_threshold()
if threshold < 0 or len(s_val_encoded) < threshold: # Either no threshold or value is small enough.
if AIRFLOW_V_3_0_PLUS:
return s_val
else:
# TODO: Remove this branch once we drop support for Airflow 2
# This is for Airflow 2.10 where the value is expected to be bytes
return s_val_encoded

Thanks! So, yeah, thats true. We still are keeping the same behaviour even with this change, for smaller xcom values, we store in DB in a serialised manner, and for larger data, we store the path in a serialised manner. So when the get xcom is called for an xcom whcih went to object store, the data would be returned as a json object, correct? And so would it for smaller xcom values too. Is that right or i misunderstood?

@amoghrajesh
Copy link
Contributor Author

Hmmm, i didnt think it would make the tests fail like this:

  {                                {                               
    ('t3', None): JsonValue(root=    ('t3', None): [               
  [['a', 'b'], [4], ['z']]),                                       
                                       ['a', 'b'],                 
                                       [4],                        
                                       ['z'],                      
                                     ],                            
    ('tg.t1', 0): [                  ('tg.t1', 0): [               
      'a',                             'a',                        
      'b',                             'b',                        
    ],                               ],                            
    ('tg.t1', 1): [4],               ('tg.t1', 1): [4],            
    ('tg.t1', 2): ['z'],             ('tg.t1', 2): ['z'],          
    ('tg.t2', 0): JsonValue(root=    ('tg.t2', 0): [               
  ['a', 'b']),                                                     
    ('tg.t2', 1): JsonValue(root=      'a',                        
  [4]),                                                            
    ('tg.t2', 2): JsonValue(root=      'b',                        
  ['z']),                                                          
                                     ],                            
                                     ('tg.t2', 1): [4],            
                                     ('tg.t2', 2): ['z'],          
  }                                }

@amoghrajesh amoghrajesh self-assigned this Feb 17, 2025
@amoghrajesh amoghrajesh merged commit e3aabea into apache:main Feb 17, 2025
63 checks passed
@amoghrajesh amoghrajesh deleted the AIP72-fix-xcom-double-ser branch February 17, 2025 07:16
dantonbertuol pushed a commit to dantonbertuol/airflow that referenced this pull request Feb 17, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Issues while reading Xcom values

3 participants