diff --git a/airflow/api_connexion/endpoints/xcom_endpoint.py b/airflow/api_connexion/endpoints/xcom_endpoint.py index 68a9ab2d45f52..59fa9f5acaaa5 100644 --- a/airflow/api_connexion/endpoints/xcom_endpoint.py +++ b/airflow/api_connexion/endpoints/xcom_endpoint.py @@ -25,7 +25,12 @@ from airflow.api_connexion import security from airflow.api_connexion.exceptions import BadRequest, NotFound from airflow.api_connexion.parameters import check_limit, format_parameters -from airflow.api_connexion.schemas.xcom_schema import XComCollection, xcom_collection_schema, xcom_schema +from airflow.api_connexion.schemas.xcom_schema import ( + XComCollection, + xcom_collection_schema, + xcom_schema_native, + xcom_schema_string, +) from airflow.auth.managers.models.resource_details import DagAccessEntity from airflow.models import DagRun as DR, XCom from airflow.settings import conf @@ -88,6 +93,7 @@ def get_xcom_entry( xcom_key: str, map_index: int = -1, deserialize: bool = False, + stringify: bool = True, session: Session = NEW_SESSION, ) -> APIResponse: """Get an XCom entry.""" @@ -119,4 +125,7 @@ def get_xcom_entry( stub.value = XCom.deserialize_value(stub) item = stub - return xcom_schema.dump(item) + if stringify: + return xcom_schema_string.dump(item) + + return xcom_schema_native.dump(item) diff --git a/airflow/api_connexion/openapi/v1.yaml b/airflow/api_connexion/openapi/v1.yaml index 6b5800f870413..ee4d82ea7815a 100644 --- a/airflow/api_connexion/openapi/v1.yaml +++ b/airflow/api_connexion/openapi/v1.yaml @@ -1904,6 +1904,19 @@ paths: This parameter is not meaningful when using the default XCom backend. *New in version 2.4.0* + - in: query + name: stringify + schema: + type: boolean + default: true + required: false + description: | + Whether to convert the XCom value to be a string. XCom values can be of Any data type. + + If set to true (default) the Any value will be returned as string, e.g. a Python representation + of a dict. If set to false it will return the raw data as dict, list, string or whatever was stored. + + *New in version 2.10.0* responses: "200": description: Success. @@ -3921,8 +3934,15 @@ components: - type: object properties: value: - type: string - description: The value + anyOf: + - type: string + - type: number + - type: integer + - type: boolean + - type: array + items: {} + - type: object + description: The value(s), # Python objects # Based on diff --git a/airflow/api_connexion/schemas/xcom_schema.py b/airflow/api_connexion/schemas/xcom_schema.py index 5894db8b1ad8b..625f05bd14597 100644 --- a/airflow/api_connexion/schemas/xcom_schema.py +++ b/airflow/api_connexion/schemas/xcom_schema.py @@ -40,10 +40,16 @@ class Meta: dag_id = auto_field() -class XComSchema(XComCollectionItemSchema): - """XCom schema.""" +class XComSchemaNative(XComCollectionItemSchema): + """XCom schema with native return type.""" - value = auto_field() + value = fields.Raw() + + +class XComSchemaString(XComCollectionItemSchema): + """XCom schema forced to be string.""" + + value = fields.String() class XComCollection(NamedTuple): @@ -60,6 +66,7 @@ class XComCollectionSchema(Schema): total_entries = fields.Int() -xcom_schema = XComSchema() +xcom_schema_native = XComSchemaNative() +xcom_schema_string = XComSchemaString() xcom_collection_item_schema = XComCollectionItemSchema() xcom_collection_schema = XComCollectionSchema() diff --git a/airflow/www/static/js/api/useTaskXcom.ts b/airflow/www/static/js/api/useTaskXcom.ts index 1faa19005a906..403233285eb11 100644 --- a/airflow/www/static/js/api/useTaskXcom.ts +++ b/airflow/www/static/js/api/useTaskXcom.ts @@ -63,7 +63,7 @@ export const useTaskXcomEntry = ({ .replace("_DAG_RUN_ID_", dagRunId) .replace("_TASK_ID_", taskId) .replace("_XCOM_KEY_", xcomKey), - { params: { map_index: mapIndex } } + { params: { map_index: mapIndex, stringify: false } } ), { enabled: !!xcomKey, diff --git a/airflow/www/static/js/dag/details/taskInstance/Xcom/XcomEntry.tsx b/airflow/www/static/js/dag/details/taskInstance/Xcom/XcomEntry.tsx index 8523181ae2146..2e9ba769ae099 100644 --- a/airflow/www/static/js/dag/details/taskInstance/Xcom/XcomEntry.tsx +++ b/airflow/www/static/js/dag/details/taskInstance/Xcom/XcomEntry.tsx @@ -17,11 +17,12 @@ * under the License. */ -import { Alert, AlertIcon, Spinner, Td, Text, Tr } from "@chakra-ui/react"; +import { Alert, AlertIcon, Spinner, Td, Tr } from "@chakra-ui/react"; import React from "react"; import { useTaskXcomEntry } from "src/api"; import ErrorAlert from "src/components/ErrorAlert"; +import RenderedJsonField from "src/components/RenderedJsonField"; import type { Dag, DagRun, TaskInstance } from "src/types"; interface Props { @@ -54,18 +55,30 @@ const XcomEntry = ({ tryNumber: tryNumber || 1, }); - let content = {xcom?.value}; + let content = null; if (isLoading) { content = ; } else if (error) { content = ; - } else if (!xcom) { + } else if (!xcom || !xcom.value) { content = ( No value found for XCom key ); + } else { + let xcomString = ""; + if (typeof xcom.value !== "string") { + try { + xcomString = JSON.stringify(xcom.value); + } catch (e) { + // skip + } + } else { + xcomString = xcom.value as string; + } + content = ; } return ( diff --git a/airflow/www/static/js/types/api-generated.ts b/airflow/www/static/js/types/api-generated.ts index 2a92c2c087a6d..97ec6a4e12a52 100644 --- a/airflow/www/static/js/types/api-generated.ts +++ b/airflow/www/static/js/types/api-generated.ts @@ -1599,8 +1599,13 @@ export interface components { } & components["schemas"]["CollectionInfo"]; /** @description Full representations of XCom entry. */ XCom: components["schemas"]["XComCollectionItem"] & { - /** @description The value */ - value?: string; + /** @description The value(s), */ + value?: Partial & + Partial & + Partial & + Partial & + Partial & + Partial<{ [key: string]: unknown }>; }; /** * @description DAG details. @@ -4439,6 +4444,15 @@ export interface operations { * *New in version 2.4.0* */ deserialize?: boolean; + /** + * Whether to convert the XCom value to be a string. XCom values can be of Any data type. + * + * If set to true (default) the Any value will be returned as string, e.g. a Python representation + * of a dict. If set to false it will return the raw data as dict, list, string or whatever was stored. + * + * *New in version 2.10.0* + */ + stringify?: boolean; }; }; responses: { diff --git a/tests/api_connexion/endpoints/test_xcom_endpoint.py b/tests/api_connexion/endpoints/test_xcom_endpoint.py index 1e4dbb56780cf..318e97842d9e6 100644 --- a/tests/api_connexion/endpoints/test_xcom_endpoint.py +++ b/tests/api_connexion/endpoints/test_xcom_endpoint.py @@ -122,14 +122,14 @@ def teardown_method(self) -> None: class TestGetXComEntry(TestXComEndpoint): - def test_should_respond_200(self): + def test_should_respond_200_stringify(self): dag_id = "test-dag-id" task_id = "test-task-id" execution_date = "2005-04-02T00:00:00+00:00" xcom_key = "test-xcom-key" execution_date_parsed = parse_execution_date(execution_date) run_id = DagRun.generate_run_id(DagRunType.MANUAL, execution_date_parsed) - self._create_xcom_entry(dag_id, run_id, execution_date_parsed, task_id, xcom_key) + self._create_xcom_entry(dag_id, run_id, execution_date_parsed, task_id, xcom_key, {"key": "value"}) response = self.client.get( f"/api/v1/dags/{dag_id}/dagRuns/{run_id}/taskInstances/{task_id}/xcomEntries/{xcom_key}", environ_overrides={"REMOTE_USER": "test"}, @@ -145,7 +145,33 @@ def test_should_respond_200(self): "task_id": task_id, "map_index": -1, "timestamp": "TIMESTAMP", - "value": "TEST_VALUE", + "value": "{'key': 'value'}", + } + + def test_should_respond_200_native(self): + dag_id = "test-dag-id" + task_id = "test-task-id" + execution_date = "2005-04-02T00:00:00+00:00" + xcom_key = "test-xcom-key" + execution_date_parsed = parse_execution_date(execution_date) + run_id = DagRun.generate_run_id(DagRunType.MANUAL, execution_date_parsed) + self._create_xcom_entry(dag_id, run_id, execution_date_parsed, task_id, xcom_key, {"key": "value"}) + response = self.client.get( + f"/api/v1/dags/{dag_id}/dagRuns/{run_id}/taskInstances/{task_id}/xcomEntries/{xcom_key}?stringify=false", + environ_overrides={"REMOTE_USER": "test"}, + ) + assert 200 == response.status_code + + current_data = response.json + current_data["timestamp"] = "TIMESTAMP" + assert current_data == { + "dag_id": dag_id, + "execution_date": execution_date, + "key": xcom_key, + "task_id": task_id, + "map_index": -1, + "timestamp": "TIMESTAMP", + "value": {"key": "value"}, } def test_should_raise_404_for_non_existent_xcom(self): @@ -192,7 +218,9 @@ def test_should_raise_403_forbidden(self): ) assert response.status_code == 403 - def _create_xcom_entry(self, dag_id, run_id, execution_date, task_id, xcom_key, *, backend=XCom): + def _create_xcom_entry( + self, dag_id, run_id, execution_date, task_id, xcom_key, xcom_value="TEST_VALUE", *, backend=XCom + ): with create_session() as session: dagrun = DagRun( dag_id=dag_id, @@ -207,7 +235,7 @@ def _create_xcom_entry(self, dag_id, run_id, execution_date, task_id, xcom_key, session.add(ti) backend.set( key=xcom_key, - value="TEST_VALUE", + value=xcom_value, run_id=run_id, task_id=task_id, dag_id=dag_id, diff --git a/tests/api_connexion/schemas/test_xcom_schema.py b/tests/api_connexion/schemas/test_xcom_schema.py index eb3220626dcef..7a10b7e7a47b7 100644 --- a/tests/api_connexion/schemas/test_xcom_schema.py +++ b/tests/api_connexion/schemas/test_xcom_schema.py @@ -25,7 +25,7 @@ XComCollection, xcom_collection_item_schema, xcom_collection_schema, - xcom_schema, + xcom_schema_string, ) from airflow.models import DagRun, XCom from airflow.utils.dates import parse_execution_date @@ -199,7 +199,7 @@ def test_serialize(self, create_xcom, session): value=pickle.dumps(b"test_binary"), ) xcom_model = session.query(XCom).first() - deserialized_xcom = xcom_schema.dump(xcom_model) + deserialized_xcom = xcom_schema_string.dump(xcom_model) assert deserialized_xcom == { "key": "test_key", "timestamp": self.default_time, @@ -220,7 +220,7 @@ def test_deserialize(self): "dag_id": "test_dag", "value": b"test_binary", } - result = xcom_schema.load(xcom_dump) + result = xcom_schema_string.load(xcom_dump) assert result == { "key": "test_key", "timestamp": self.default_time_parsed,