Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make XCom display as react json #40640

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 11 additions & 2 deletions airflow/api_connexion/endpoints/xcom_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,12 @@
from airflow.api_connexion import security
from airflow.api_connexion.exceptions import BadRequest, NotFound
from airflow.api_connexion.parameters import check_limit, format_parameters
from airflow.api_connexion.schemas.xcom_schema import XComCollection, xcom_collection_schema, xcom_schema
from airflow.api_connexion.schemas.xcom_schema import (
XComCollection,
xcom_collection_schema,
xcom_schema_native,
xcom_schema_string,
)
from airflow.auth.managers.models.resource_details import DagAccessEntity
from airflow.models import DagRun as DR, XCom
from airflow.settings import conf
Expand Down Expand Up @@ -88,6 +93,7 @@ def get_xcom_entry(
xcom_key: str,
map_index: int = -1,
deserialize: bool = False,
stringify: bool = True,
session: Session = NEW_SESSION,
) -> APIResponse:
"""Get an XCom entry."""
Expand Down Expand Up @@ -119,4 +125,7 @@ def get_xcom_entry(
stub.value = XCom.deserialize_value(stub)
item = stub

return xcom_schema.dump(item)
if stringify:
return xcom_schema_string.dump(item)

return xcom_schema_native.dump(item)
24 changes: 22 additions & 2 deletions airflow/api_connexion/openapi/v1.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1904,6 +1904,19 @@ paths:
This parameter is not meaningful when using the default XCom backend.

*New in version 2.4.0*
- in: query
name: stringify
schema:
type: boolean
default: true
required: false
description: |
Whether to convert the XCom value to be a string. XCom values can be of Any data type.

If set to true (default) the Any value will be returned as string, e.g. a Python representation
of a dict. If set to false it will return the raw data as dict, list, string or whatever was stored.

*New in version 2.10.0*
responses:
"200":
description: Success.
Expand Down Expand Up @@ -3921,8 +3934,15 @@ components:
- type: object
properties:
value:
type: string
description: The value
anyOf:
- type: string
- type: number
- type: integer
- type: boolean
- type: array
items: {}
- type: object
description: The value(s),

# Python objects
# Based on
Expand Down
15 changes: 11 additions & 4 deletions airflow/api_connexion/schemas/xcom_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,16 @@ class Meta:
dag_id = auto_field()


class XComSchema(XComCollectionItemSchema):
"""XCom schema."""
class XComSchemaNative(XComCollectionItemSchema):
"""XCom schema with native return type."""

value = auto_field()
value = fields.Raw()


class XComSchemaString(XComCollectionItemSchema):
"""XCom schema forced to be string."""

value = fields.String()


class XComCollection(NamedTuple):
Expand All @@ -60,6 +66,7 @@ class XComCollectionSchema(Schema):
total_entries = fields.Int()


xcom_schema = XComSchema()
xcom_schema_native = XComSchemaNative()
xcom_schema_string = XComSchemaString()
xcom_collection_item_schema = XComCollectionItemSchema()
xcom_collection_schema = XComCollectionSchema()
2 changes: 1 addition & 1 deletion airflow/www/static/js/api/useTaskXcom.ts
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ export const useTaskXcomEntry = ({
.replace("_DAG_RUN_ID_", dagRunId)
.replace("_TASK_ID_", taskId)
.replace("_XCOM_KEY_", xcomKey),
{ params: { map_index: mapIndex } }
{ params: { map_index: mapIndex, stringify: false } }
),
{
enabled: !!xcomKey,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,12 @@
* under the License.
*/

import { Alert, AlertIcon, Spinner, Td, Text, Tr } from "@chakra-ui/react";
import { Alert, AlertIcon, Spinner, Td, Tr } from "@chakra-ui/react";
import React from "react";

import { useTaskXcomEntry } from "src/api";
import ErrorAlert from "src/components/ErrorAlert";
import RenderedJsonField from "src/components/RenderedJsonField";
import type { Dag, DagRun, TaskInstance } from "src/types";

interface Props {
Expand Down Expand Up @@ -54,18 +55,30 @@ const XcomEntry = ({
tryNumber: tryNumber || 1,
});

let content = <Text fontFamily="monospace">{xcom?.value}</Text>;
let content = null;
if (isLoading) {
content = <Spinner />;
} else if (error) {
content = <ErrorAlert error={error} />;
} else if (!xcom) {
} else if (!xcom || !xcom.value) {
content = (
<Alert status="info">
<AlertIcon />
No value found for XCom key
</Alert>
);
} else {
let xcomString = "";
if (typeof xcom.value !== "string") {
try {
xcomString = JSON.stringify(xcom.value);
} catch (e) {
// skip
}
} else {
xcomString = xcom.value as string;
}
content = <RenderedJsonField content={xcomString} />;
}

return (
Expand Down
18 changes: 16 additions & 2 deletions airflow/www/static/js/types/api-generated.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1599,8 +1599,13 @@ export interface components {
} & components["schemas"]["CollectionInfo"];
/** @description Full representations of XCom entry. */
XCom: components["schemas"]["XComCollectionItem"] & {
/** @description The value */
value?: string;
/** @description The value(s), */
value?: Partial<string> &
Partial<number> &
Partial<number> &
Partial<boolean> &
Partial<unknown[]> &
Partial<{ [key: string]: unknown }>;
};
/**
* @description DAG details.
Expand Down Expand Up @@ -4439,6 +4444,15 @@ export interface operations {
* *New in version 2.4.0*
*/
deserialize?: boolean;
/**
* Whether to convert the XCom value to be a string. XCom values can be of Any data type.
*
* If set to true (default) the Any value will be returned as string, e.g. a Python representation
* of a dict. If set to false it will return the raw data as dict, list, string or whatever was stored.
*
* *New in version 2.10.0*
*/
stringify?: boolean;
};
};
responses: {
Expand Down
38 changes: 33 additions & 5 deletions tests/api_connexion/endpoints/test_xcom_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,14 +122,14 @@ def teardown_method(self) -> None:


class TestGetXComEntry(TestXComEndpoint):
def test_should_respond_200(self):
def test_should_respond_200_stringify(self):
dag_id = "test-dag-id"
task_id = "test-task-id"
execution_date = "2005-04-02T00:00:00+00:00"
xcom_key = "test-xcom-key"
execution_date_parsed = parse_execution_date(execution_date)
run_id = DagRun.generate_run_id(DagRunType.MANUAL, execution_date_parsed)
self._create_xcom_entry(dag_id, run_id, execution_date_parsed, task_id, xcom_key)
self._create_xcom_entry(dag_id, run_id, execution_date_parsed, task_id, xcom_key, {"key": "value"})
response = self.client.get(
f"/api/v1/dags/{dag_id}/dagRuns/{run_id}/taskInstances/{task_id}/xcomEntries/{xcom_key}",
environ_overrides={"REMOTE_USER": "test"},
Expand All @@ -145,7 +145,33 @@ def test_should_respond_200(self):
"task_id": task_id,
"map_index": -1,
"timestamp": "TIMESTAMP",
"value": "TEST_VALUE",
"value": "{'key': 'value'}",
}

def test_should_respond_200_native(self):
dag_id = "test-dag-id"
task_id = "test-task-id"
execution_date = "2005-04-02T00:00:00+00:00"
xcom_key = "test-xcom-key"
execution_date_parsed = parse_execution_date(execution_date)
run_id = DagRun.generate_run_id(DagRunType.MANUAL, execution_date_parsed)
self._create_xcom_entry(dag_id, run_id, execution_date_parsed, task_id, xcom_key, {"key": "value"})
response = self.client.get(
f"/api/v1/dags/{dag_id}/dagRuns/{run_id}/taskInstances/{task_id}/xcomEntries/{xcom_key}?stringify=false",
environ_overrides={"REMOTE_USER": "test"},
)
assert 200 == response.status_code

current_data = response.json
current_data["timestamp"] = "TIMESTAMP"
assert current_data == {
"dag_id": dag_id,
"execution_date": execution_date,
"key": xcom_key,
"task_id": task_id,
"map_index": -1,
"timestamp": "TIMESTAMP",
"value": {"key": "value"},
}

def test_should_raise_404_for_non_existent_xcom(self):
Expand Down Expand Up @@ -192,7 +218,9 @@ def test_should_raise_403_forbidden(self):
)
assert response.status_code == 403

def _create_xcom_entry(self, dag_id, run_id, execution_date, task_id, xcom_key, *, backend=XCom):
def _create_xcom_entry(
self, dag_id, run_id, execution_date, task_id, xcom_key, xcom_value="TEST_VALUE", *, backend=XCom
):
with create_session() as session:
dagrun = DagRun(
dag_id=dag_id,
Expand All @@ -207,7 +235,7 @@ def _create_xcom_entry(self, dag_id, run_id, execution_date, task_id, xcom_key,
session.add(ti)
backend.set(
key=xcom_key,
value="TEST_VALUE",
value=xcom_value,
run_id=run_id,
task_id=task_id,
dag_id=dag_id,
Expand Down
6 changes: 3 additions & 3 deletions tests/api_connexion/schemas/test_xcom_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
XComCollection,
xcom_collection_item_schema,
xcom_collection_schema,
xcom_schema,
xcom_schema_string,
)
from airflow.models import DagRun, XCom
from airflow.utils.dates import parse_execution_date
Expand Down Expand Up @@ -199,7 +199,7 @@ def test_serialize(self, create_xcom, session):
value=pickle.dumps(b"test_binary"),
)
xcom_model = session.query(XCom).first()
deserialized_xcom = xcom_schema.dump(xcom_model)
deserialized_xcom = xcom_schema_string.dump(xcom_model)
assert deserialized_xcom == {
"key": "test_key",
"timestamp": self.default_time,
Expand All @@ -220,7 +220,7 @@ def test_deserialize(self):
"dag_id": "test_dag",
"value": b"test_binary",
}
result = xcom_schema.load(xcom_dump)
result = xcom_schema_string.load(xcom_dump)
assert result == {
"key": "test_key",
"timestamp": self.default_time_parsed,
Expand Down