Skip to content

Move XCom serialization and deserialization methods to the Task SDK #45231

@kaxil

Description

@kaxil

Currently, as part of AIP-72, we implemented XCom API endpoints in the Task Execution API Server that expect a JSON-serialized string

def set_xcom(
dag_id: str,
run_id: str,
task_id: str,
key: str,
value: Annotated[
str,
Body(
description="A JSON-formatted string representing the value to set for the XCom.",
openapi_examples={
"simple_value": {
"summary": "Simple value",
"value": '"value1"',
},
"dict_value": {
"summary": "Dictionary value",
"value": '{"key2": "value2"}',
},
"list_value": {
"summary": "List value",
"value": '["value1"]',
},
},
),
],

This was done so that we can have the same interface for multi-language support (Python, Go, Java etc) for Task SDK. The contract is simple: API Server always deals with JSON-formatted string --- and the responsibility of serialization and de-serialization of Native objects to string and back lies to the language-specific clients.

In order to do that we should move the current Serialization and de-serialization logic to clients. This will also allow storing and using XCom backends correctly, which can also handle different languages.

As part of this task, we should also figure out how we can avoid serialization and deserialization from SQLAlchemy as we use JSON type for value column --- which leads to double / triple serialization

  1. Serialization from Task SDK and sent to API Server
  2. API Server calls XCom.serialize
  3. SQLAlchemy JSON type serializes it further

Result of Triple serialization:

image

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    Status

    Done

    Milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions