diff --git a/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml b/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml index ce8aa17e3d721..276fef3de3b25 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml +++ b/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml @@ -4980,6 +4980,81 @@ paths: application/json: schema: $ref: '#/components/schemas/HTTPValidationError' + delete: + tags: + - XCom + summary: Delete Xcom Entry + description: Delete an XCom entry. + operationId: delete_xcom_entry + security: + - OAuth2PasswordBearer: [] + - HTTPBearer: [] + parameters: + - name: dag_id + in: path + required: true + schema: + type: string + title: Dag Id + - name: task_id + in: path + required: true + schema: + type: string + title: Task Id + - name: dag_run_id + in: path + required: true + schema: + type: string + title: Dag Run Id + - name: xcom_key + in: path + required: true + schema: + type: string + title: Xcom Key + - name: map_index + in: query + required: false + schema: + type: integer + minimum: -1 + default: -1 + title: Map Index + responses: + '204': + description: Successful Response + '401': + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPExceptionResponse' + description: Unauthorized + '403': + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPExceptionResponse' + description: Forbidden + '400': + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPExceptionResponse' + description: Bad Request + '404': + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPExceptionResponse' + description: Not Found + '422': + description: Validation Error + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPValidationError' /api/v2/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/xcomEntries: get: tags: diff --git a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/xcom.py b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/xcom.py index 6b99c1562d4ea..f6e7632b3594e 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/xcom.py +++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/xcom.py @@ -21,7 +21,7 @@ from typing import Annotated from fastapi import Depends, HTTPException, Query, status -from sqlalchemy import and_, select +from sqlalchemy import and_, delete, select from sqlalchemy.orm import joinedload from airflow.api_fastapi.auth.managers.models.resource_details import DagAccessEntity @@ -315,7 +315,6 @@ def update_xcom_entry( ) -> XComResponseNative: """Update an existing XCom entry.""" # Check if XCom entry exists - xcom_new_value = json.dumps(patch_body.value) xcom_entry = session.scalar( select(XComModel) .where( @@ -336,6 +335,47 @@ def update_xcom_entry( ) # Update XCom entry - xcom_entry.value = json.dumps(xcom_new_value) + xcom_entry.value = json.dumps(patch_body.value) return XComResponseNative.model_validate(xcom_entry) + + +@xcom_router.delete( + "/{xcom_key:path}", + status_code=status.HTTP_204_NO_CONTENT, + responses=create_openapi_http_exception_doc( + [ + status.HTTP_400_BAD_REQUEST, + status.HTTP_404_NOT_FOUND, + ] + ), + dependencies=[ + Depends(action_logging()), + Depends(requires_access_dag(method="DELETE", access_entity=DagAccessEntity.XCOM)), + ], +) +def delete_xcom_entry( + dag_id: str, + task_id: str, + dag_run_id: str, + xcom_key: str, + session: SessionDep, + map_index: Annotated[int, Query(ge=-1)] = -1, +): + """Delete an XCom entry.""" + # Delete XCom entry + result = session.execute( + delete(XComModel).where( + XComModel.dag_id == dag_id, + XComModel.task_id == task_id, + XComModel.run_id == dag_run_id, + XComModel.key == xcom_key, + XComModel.map_index == map_index, + ) + ) + + if getattr(result, "rowcount", 0) == 0: + raise HTTPException( + status.HTTP_404_NOT_FOUND, + f"The XCom with key: `{xcom_key}` with mentioned task instance doesn't exist.", + ) diff --git a/airflow-core/src/airflow/ui/openapi-gen/queries/common.ts b/airflow-core/src/airflow/ui/openapi-gen/queries/common.ts index aa544d1108aaf..72e686f658a89 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/queries/common.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/queries/common.ts @@ -908,4 +908,5 @@ export type DagRunServiceDeleteDagRunMutationResult = Awaited>; export type TaskInstanceServiceDeleteTaskInstanceMutationResult = Awaited>; export type PoolServiceDeletePoolMutationResult = Awaited>; +export type XcomServiceDeleteXcomEntryMutationResult = Awaited>; export type VariableServiceDeleteVariableMutationResult = Awaited>; diff --git a/airflow-core/src/airflow/ui/openapi-gen/queries/queries.ts b/airflow-core/src/airflow/ui/openapi-gen/queries/queries.ts index ccd2ff54d9947..77a2c714b1f40 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/queries/queries.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/queries/queries.ts @@ -2429,6 +2429,31 @@ export const usePoolServiceDeletePool = ({ mutationFn: ({ poolName }) => PoolService.deletePool({ poolName }) as unknown as Promise, ...options }); /** +* Delete Xcom Entry +* Delete an XCom entry. +* @param data The data for the request. +* @param data.dagId +* @param data.taskId +* @param data.dagRunId +* @param data.xcomKey +* @param data.mapIndex +* @returns void Successful Response +* @throws ApiError +*/ +export const useXcomServiceDeleteXcomEntry = (options?: Omit, "mutationFn">) => useMutation({ mutationFn: ({ dagId, dagRunId, mapIndex, taskId, xcomKey }) => XcomService.deleteXcomEntry({ dagId, dagRunId, mapIndex, taskId, xcomKey }) as unknown as Promise, ...options }); +/** * Delete Variable * Delete a variable entry. * @param data The data for the request. diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/services.gen.ts b/airflow-core/src/airflow/ui/openapi-gen/requests/services.gen.ts index d016eaa00f4fd..0d8dd62537f73 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/requests/services.gen.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/requests/services.gen.ts @@ -3,7 +3,7 @@ import type { CancelablePromise } from './core/CancelablePromise'; import { OpenAPI } from './core/OpenAPI'; import { request as __request } from './core/request'; -import type { GetAssetsData, GetAssetsResponse, GetAssetAliasesData, GetAssetAliasesResponse, GetAssetAliasData, GetAssetAliasResponse, GetAssetEventsData, GetAssetEventsResponse, CreateAssetEventData, CreateAssetEventResponse, MaterializeAssetData, MaterializeAssetResponse, GetAssetQueuedEventsData, GetAssetQueuedEventsResponse, DeleteAssetQueuedEventsData, DeleteAssetQueuedEventsResponse, GetAssetData, GetAssetResponse, GetDagAssetQueuedEventsData, GetDagAssetQueuedEventsResponse, DeleteDagAssetQueuedEventsData, DeleteDagAssetQueuedEventsResponse, GetDagAssetQueuedEventData, GetDagAssetQueuedEventResponse, DeleteDagAssetQueuedEventData, DeleteDagAssetQueuedEventResponse, NextRunAssetsData, NextRunAssetsResponse, ListBackfillsData, ListBackfillsResponse, CreateBackfillData, CreateBackfillResponse, GetBackfillData, GetBackfillResponse, PauseBackfillData, PauseBackfillResponse, UnpauseBackfillData, UnpauseBackfillResponse, CancelBackfillData, CancelBackfillResponse, CreateBackfillDryRunData, CreateBackfillDryRunResponse, ListBackfillsUiData, ListBackfillsUiResponse, DeleteConnectionData, DeleteConnectionResponse, GetConnectionData, GetConnectionResponse, PatchConnectionData, PatchConnectionResponse, GetConnectionsData, GetConnectionsResponse, PostConnectionData, PostConnectionResponse, BulkConnectionsData, BulkConnectionsResponse, TestConnectionData, TestConnectionResponse, CreateDefaultConnectionsResponse, HookMetaDataResponse, GetDagRunData, GetDagRunResponse, DeleteDagRunData, DeleteDagRunResponse, PatchDagRunData, PatchDagRunResponse, GetUpstreamAssetEventsData, GetUpstreamAssetEventsResponse, ClearDagRunData, ClearDagRunResponse, GetDagRunsData, GetDagRunsResponse, TriggerDagRunData, TriggerDagRunResponse, WaitDagRunUntilFinishedData, WaitDagRunUntilFinishedResponse, GetListDagRunsBatchData, GetListDagRunsBatchResponse, GetDagSourceData, GetDagSourceResponse, GetDagStatsData, GetDagStatsResponse, GetConfigData, GetConfigResponse, GetConfigValueData, GetConfigValueResponse, GetConfigsResponse, ListDagWarningsData, ListDagWarningsResponse, GetDagsData, GetDagsResponse, PatchDagsData, PatchDagsResponse, GetDagData, GetDagResponse, PatchDagData, PatchDagResponse, DeleteDagData, DeleteDagResponse, GetDagDetailsData, GetDagDetailsResponse, FavoriteDagData, FavoriteDagResponse, UnfavoriteDagData, UnfavoriteDagResponse, GetDagTagsData, GetDagTagsResponse, GetDagsUiData, GetDagsUiResponse, GetLatestRunInfoData, GetLatestRunInfoResponse, GetEventLogData, GetEventLogResponse, GetEventLogsData, GetEventLogsResponse, GetExtraLinksData, GetExtraLinksResponse, GetTaskInstanceData, GetTaskInstanceResponse, PatchTaskInstanceData, PatchTaskInstanceResponse, DeleteTaskInstanceData, DeleteTaskInstanceResponse, GetMappedTaskInstancesData, GetMappedTaskInstancesResponse, GetTaskInstanceDependenciesByMapIndexData, GetTaskInstanceDependenciesByMapIndexResponse, GetTaskInstanceDependenciesData, GetTaskInstanceDependenciesResponse, GetTaskInstanceTriesData, GetTaskInstanceTriesResponse, GetMappedTaskInstanceTriesData, GetMappedTaskInstanceTriesResponse, GetMappedTaskInstanceData, GetMappedTaskInstanceResponse, PatchTaskInstanceByMapIndexData, PatchTaskInstanceByMapIndexResponse, GetTaskInstancesData, GetTaskInstancesResponse, BulkTaskInstancesData, BulkTaskInstancesResponse, GetTaskInstancesBatchData, GetTaskInstancesBatchResponse, GetTaskInstanceTryDetailsData, GetTaskInstanceTryDetailsResponse, GetMappedTaskInstanceTryDetailsData, GetMappedTaskInstanceTryDetailsResponse, PostClearTaskInstancesData, PostClearTaskInstancesResponse, PatchTaskInstanceDryRunByMapIndexData, PatchTaskInstanceDryRunByMapIndexResponse, PatchTaskInstanceDryRunData, PatchTaskInstanceDryRunResponse, GetLogData, GetLogResponse, GetExternalLogUrlData, GetExternalLogUrlResponse, UpdateHitlDetailData, UpdateHitlDetailResponse, GetHitlDetailData, GetHitlDetailResponse, GetHitlDetailsData, GetHitlDetailsResponse, GetImportErrorData, GetImportErrorResponse, GetImportErrorsData, GetImportErrorsResponse, GetJobsData, GetJobsResponse, GetPluginsData, GetPluginsResponse, ImportErrorsResponse, DeletePoolData, DeletePoolResponse, GetPoolData, GetPoolResponse, PatchPoolData, PatchPoolResponse, GetPoolsData, GetPoolsResponse, PostPoolData, PostPoolResponse, BulkPoolsData, BulkPoolsResponse, GetProvidersData, GetProvidersResponse, GetXcomEntryData, GetXcomEntryResponse, UpdateXcomEntryData, UpdateXcomEntryResponse, GetXcomEntriesData, GetXcomEntriesResponse, CreateXcomEntryData, CreateXcomEntryResponse, GetTasksData, GetTasksResponse, GetTaskData, GetTaskResponse, DeleteVariableData, DeleteVariableResponse, GetVariableData, GetVariableResponse, PatchVariableData, PatchVariableResponse, GetVariablesData, GetVariablesResponse, PostVariableData, PostVariableResponse, BulkVariablesData, BulkVariablesResponse, ReparseDagFileData, ReparseDagFileResponse, GetDagVersionData, GetDagVersionResponse, GetDagVersionsData, GetDagVersionsResponse, GetHealthResponse, GetVersionResponse, LoginData, LoginResponse, LogoutResponse, GetAuthMenusResponse, GetCurrentUserInfoResponse, GetDependenciesData, GetDependenciesResponse, HistoricalMetricsData, HistoricalMetricsResponse, DagStatsResponse2, StructureDataData, StructureDataResponse2, GetDagStructureData, GetDagStructureResponse, GetGridRunsData, GetGridRunsResponse, GetGridTiSummariesData, GetGridTiSummariesResponse, GetCalendarData, GetCalendarResponse, ListTeamsData, ListTeamsResponse } from './types.gen'; +import type { GetAssetsData, GetAssetsResponse, GetAssetAliasesData, GetAssetAliasesResponse, GetAssetAliasData, GetAssetAliasResponse, GetAssetEventsData, GetAssetEventsResponse, CreateAssetEventData, CreateAssetEventResponse, MaterializeAssetData, MaterializeAssetResponse, GetAssetQueuedEventsData, GetAssetQueuedEventsResponse, DeleteAssetQueuedEventsData, DeleteAssetQueuedEventsResponse, GetAssetData, GetAssetResponse, GetDagAssetQueuedEventsData, GetDagAssetQueuedEventsResponse, DeleteDagAssetQueuedEventsData, DeleteDagAssetQueuedEventsResponse, GetDagAssetQueuedEventData, GetDagAssetQueuedEventResponse, DeleteDagAssetQueuedEventData, DeleteDagAssetQueuedEventResponse, NextRunAssetsData, NextRunAssetsResponse, ListBackfillsData, ListBackfillsResponse, CreateBackfillData, CreateBackfillResponse, GetBackfillData, GetBackfillResponse, PauseBackfillData, PauseBackfillResponse, UnpauseBackfillData, UnpauseBackfillResponse, CancelBackfillData, CancelBackfillResponse, CreateBackfillDryRunData, CreateBackfillDryRunResponse, ListBackfillsUiData, ListBackfillsUiResponse, DeleteConnectionData, DeleteConnectionResponse, GetConnectionData, GetConnectionResponse, PatchConnectionData, PatchConnectionResponse, GetConnectionsData, GetConnectionsResponse, PostConnectionData, PostConnectionResponse, BulkConnectionsData, BulkConnectionsResponse, TestConnectionData, TestConnectionResponse, CreateDefaultConnectionsResponse, HookMetaDataResponse, GetDagRunData, GetDagRunResponse, DeleteDagRunData, DeleteDagRunResponse, PatchDagRunData, PatchDagRunResponse, GetUpstreamAssetEventsData, GetUpstreamAssetEventsResponse, ClearDagRunData, ClearDagRunResponse, GetDagRunsData, GetDagRunsResponse, TriggerDagRunData, TriggerDagRunResponse, WaitDagRunUntilFinishedData, WaitDagRunUntilFinishedResponse, GetListDagRunsBatchData, GetListDagRunsBatchResponse, GetDagSourceData, GetDagSourceResponse, GetDagStatsData, GetDagStatsResponse, GetConfigData, GetConfigResponse, GetConfigValueData, GetConfigValueResponse, GetConfigsResponse, ListDagWarningsData, ListDagWarningsResponse, GetDagsData, GetDagsResponse, PatchDagsData, PatchDagsResponse, GetDagData, GetDagResponse, PatchDagData, PatchDagResponse, DeleteDagData, DeleteDagResponse, GetDagDetailsData, GetDagDetailsResponse, FavoriteDagData, FavoriteDagResponse, UnfavoriteDagData, UnfavoriteDagResponse, GetDagTagsData, GetDagTagsResponse, GetDagsUiData, GetDagsUiResponse, GetLatestRunInfoData, GetLatestRunInfoResponse, GetEventLogData, GetEventLogResponse, GetEventLogsData, GetEventLogsResponse, GetExtraLinksData, GetExtraLinksResponse, GetTaskInstanceData, GetTaskInstanceResponse, PatchTaskInstanceData, PatchTaskInstanceResponse, DeleteTaskInstanceData, DeleteTaskInstanceResponse, GetMappedTaskInstancesData, GetMappedTaskInstancesResponse, GetTaskInstanceDependenciesByMapIndexData, GetTaskInstanceDependenciesByMapIndexResponse, GetTaskInstanceDependenciesData, GetTaskInstanceDependenciesResponse, GetTaskInstanceTriesData, GetTaskInstanceTriesResponse, GetMappedTaskInstanceTriesData, GetMappedTaskInstanceTriesResponse, GetMappedTaskInstanceData, GetMappedTaskInstanceResponse, PatchTaskInstanceByMapIndexData, PatchTaskInstanceByMapIndexResponse, GetTaskInstancesData, GetTaskInstancesResponse, BulkTaskInstancesData, BulkTaskInstancesResponse, GetTaskInstancesBatchData, GetTaskInstancesBatchResponse, GetTaskInstanceTryDetailsData, GetTaskInstanceTryDetailsResponse, GetMappedTaskInstanceTryDetailsData, GetMappedTaskInstanceTryDetailsResponse, PostClearTaskInstancesData, PostClearTaskInstancesResponse, PatchTaskInstanceDryRunByMapIndexData, PatchTaskInstanceDryRunByMapIndexResponse, PatchTaskInstanceDryRunData, PatchTaskInstanceDryRunResponse, GetLogData, GetLogResponse, GetExternalLogUrlData, GetExternalLogUrlResponse, UpdateHitlDetailData, UpdateHitlDetailResponse, GetHitlDetailData, GetHitlDetailResponse, GetHitlDetailsData, GetHitlDetailsResponse, GetImportErrorData, GetImportErrorResponse, GetImportErrorsData, GetImportErrorsResponse, GetJobsData, GetJobsResponse, GetPluginsData, GetPluginsResponse, ImportErrorsResponse, DeletePoolData, DeletePoolResponse, GetPoolData, GetPoolResponse, PatchPoolData, PatchPoolResponse, GetPoolsData, GetPoolsResponse, PostPoolData, PostPoolResponse, BulkPoolsData, BulkPoolsResponse, GetProvidersData, GetProvidersResponse, GetXcomEntryData, GetXcomEntryResponse, UpdateXcomEntryData, UpdateXcomEntryResponse, DeleteXcomEntryData, DeleteXcomEntryResponse, GetXcomEntriesData, GetXcomEntriesResponse, CreateXcomEntryData, CreateXcomEntryResponse, GetTasksData, GetTasksResponse, GetTaskData, GetTaskResponse, DeleteVariableData, DeleteVariableResponse, GetVariableData, GetVariableResponse, PatchVariableData, PatchVariableResponse, GetVariablesData, GetVariablesResponse, PostVariableData, PostVariableResponse, BulkVariablesData, BulkVariablesResponse, ReparseDagFileData, ReparseDagFileResponse, GetDagVersionData, GetDagVersionResponse, GetDagVersionsData, GetDagVersionsResponse, GetHealthResponse, GetVersionResponse, LoginData, LoginResponse, LogoutResponse, GetAuthMenusResponse, GetCurrentUserInfoResponse, GetDependenciesData, GetDependenciesResponse, HistoricalMetricsData, HistoricalMetricsResponse, DagStatsResponse2, StructureDataData, StructureDataResponse2, GetDagStructureData, GetDagStructureResponse, GetGridRunsData, GetGridRunsResponse, GetGridTiSummariesData, GetGridTiSummariesResponse, GetCalendarData, GetCalendarResponse, ListTeamsData, ListTeamsResponse } from './types.gen'; export class AssetService { /** @@ -3264,6 +3264,41 @@ export class XcomService { }); } + /** + * Delete Xcom Entry + * Delete an XCom entry. + * @param data The data for the request. + * @param data.dagId + * @param data.taskId + * @param data.dagRunId + * @param data.xcomKey + * @param data.mapIndex + * @returns void Successful Response + * @throws ApiError + */ + public static deleteXcomEntry(data: DeleteXcomEntryData): CancelablePromise { + return __request(OpenAPI, { + method: 'DELETE', + url: '/api/v2/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/xcomEntries/{xcom_key}', + path: { + dag_id: data.dagId, + task_id: data.taskId, + dag_run_id: data.dagRunId, + xcom_key: data.xcomKey + }, + query: { + map_index: data.mapIndex + }, + errors: { + 400: 'Bad Request', + 401: 'Unauthorized', + 403: 'Forbidden', + 404: 'Not Found', + 422: 'Validation Error' + } + }); + } + /** * Get Xcom Entries * Get all XCom entries. diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts b/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts index 024485ac0df6b..273e8ccdfc841 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts @@ -3219,6 +3219,16 @@ export type UpdateXcomEntryData = { export type UpdateXcomEntryResponse = XComResponseNative; +export type DeleteXcomEntryData = { + dagId: string; + dagRunId: string; + mapIndex?: number; + taskId: string; + xcomKey: string; +}; + +export type DeleteXcomEntryResponse = void; + export type GetXcomEntriesData = { /** * SQL LIKE expression — use `%` / `_` wildcards (e.g. `%customer_%`). Regular expressions are **not** supported. @@ -6048,6 +6058,35 @@ export type $OpenApiTs = { 422: HTTPValidationError; }; }; + delete: { + req: DeleteXcomEntryData; + res: { + /** + * Successful Response + */ + 204: void; + /** + * Bad Request + */ + 400: HTTPExceptionResponse; + /** + * Unauthorized + */ + 401: HTTPExceptionResponse; + /** + * Forbidden + */ + 403: HTTPExceptionResponse; + /** + * Not Found + */ + 404: HTTPExceptionResponse; + /** + * Validation Error + */ + 422: HTTPValidationError; + }; + }; }; '/api/v2/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/xcomEntries': { get: { diff --git a/airflow-core/src/airflow/ui/public/i18n/locales/en/browse.json b/airflow-core/src/airflow/ui/public/i18n/locales/en/browse.json index 235c3ef4aa2a0..d92d59b9272c8 100644 --- a/airflow-core/src/airflow/ui/public/i18n/locales/en/browse.json +++ b/airflow-core/src/airflow/ui/public/i18n/locales/en/browse.json @@ -12,11 +12,35 @@ "title": "Audit Log" }, "xcom": { + "add": { + "error": "Failed to add XCom", + "errorTitle": "Error", + "success": "XCom added successfully", + "successTitle": "XCom Added", + "title": "Add XCom" + }, "columns": { "dag": "Dag", "key": "Key", "value": "Value" }, - "title": "XCom" + "delete": { + "error": "Failed to delete XCom", + "errorTitle": "Error", + "success": "XCom deleted successfully", + "successTitle": "XCom Deleted", + "title": "Delete XCom", + "warning": "Are you sure you want to delete this XCom? This action cannot be undone." + }, + "edit": { + "error": "Failed to update XCom", + "errorTitle": "Error", + "success": "XCom updated successfully", + "successTitle": "XCom Updated", + "title": "Edit XCom" + }, + "key": "Key", + "title": "XCom", + "value": "Value" } } diff --git a/airflow-core/src/airflow/ui/public/i18n/locales/en/common.json b/airflow-core/src/airflow/ui/public/i18n/locales/en/common.json index f8d123d4e83b5..aeded55bcf444 100644 --- a/airflow-core/src/airflow/ui/public/i18n/locales/en/common.json +++ b/airflow-core/src/airflow/ui/public/i18n/locales/en/common.json @@ -74,6 +74,7 @@ "dagWarnings": "Dag warnings/errors", "defaultToGraphView": "Default to graph view", "defaultToGridView": "Default to grid view", + "delete": "Delete", "diff": "Diff", "diffCompareWith": "Compare with", "diffExit": "Exit Diff", @@ -89,6 +90,7 @@ "tooltip": "Press {{hotkey}} to download logs" }, "duration": "Duration", + "edit": "Edit", "endDate": "End Date", "error": { "back": "Back", @@ -128,12 +130,14 @@ "logoutConfirmation": "You are about to logout from the application.", "mapIndex": "Map Index", "modal": { + "add": "Add", "cancel": "Cancel", "confirm": "Confirm", "delete": { "button": "Delete", "confirmation": "Are you sure you want to delete {{resourceName}}? This action cannot be undone." - } + }, + "save": "Save" }, "nav": { "admin": "Admin", diff --git a/airflow-core/src/airflow/ui/src/pages/XCom/AddXComButton.tsx b/airflow-core/src/airflow/ui/src/pages/XCom/AddXComButton.tsx new file mode 100644 index 0000000000000..3e8f908b56712 --- /dev/null +++ b/airflow-core/src/airflow/ui/src/pages/XCom/AddXComButton.tsx @@ -0,0 +1,57 @@ +/*! + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +import { useDisclosure } from "@chakra-ui/react"; +import { useTranslation } from "react-i18next"; +import { FiPlus } from "react-icons/fi"; + +import { Button } from "src/components/ui"; + +import XComModal from "./XComModal"; + +type AddXComButtonProps = { + readonly dagId: string; + readonly mapIndex: number; + readonly runId: string; + readonly taskId: string; +}; + +const AddXComButton = ({ dagId, mapIndex, runId, taskId }: AddXComButtonProps) => { + const { t: translate } = useTranslation("browse"); + const { onClose, onOpen, open } = useDisclosure(); + + return ( + <> + + + + + ); +}; + +export default AddXComButton; diff --git a/airflow-core/src/airflow/ui/src/pages/XCom/DeleteXComButton.tsx b/airflow-core/src/airflow/ui/src/pages/XCom/DeleteXComButton.tsx new file mode 100644 index 0000000000000..b0ccdc7e91b85 --- /dev/null +++ b/airflow-core/src/airflow/ui/src/pages/XCom/DeleteXComButton.tsx @@ -0,0 +1,96 @@ +/*! + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +import { IconButton, useDisclosure } from "@chakra-ui/react"; +import { useMutation, useQueryClient } from "@tanstack/react-query"; +import { useTranslation } from "react-i18next"; +import { FiTrash2 } from "react-icons/fi"; + +import { useXcomServiceGetXcomEntriesKey } from "openapi/queries"; +import { XcomService } from "openapi/requests/services.gen"; +import type { XComResponse } from "openapi/requests/types.gen"; +import type { DeleteXcomEntryData } from "openapi/requests/types.gen"; +import DeleteDialog from "src/components/DeleteDialog"; +import { toaster } from "src/components/ui"; + +type DeleteXComButtonProps = { + readonly xcom: XComResponse; +}; + +const DeleteXComButton = ({ xcom }: DeleteXComButtonProps) => { + const { t: translate } = useTranslation("browse"); + const { onClose, onOpen, open } = useDisclosure(); + const queryClient = useQueryClient(); + + const { isPending, mutate } = useMutation({ + mutationFn: (deleteData: DeleteXcomEntryData) => XcomService.deleteXcomEntry(deleteData), + onError: () => { + toaster.create({ + description: translate("xcom.delete.error"), + title: translate("xcom.delete.errorTitle"), + type: "error", + }); + }, + onSuccess: async () => { + await queryClient.invalidateQueries({ + queryKey: [useXcomServiceGetXcomEntriesKey], + }); + onClose(); + toaster.create({ + description: translate("xcom.delete.success"), + title: translate("xcom.delete.successTitle"), + type: "success", + }); + }, + }); + + const handleDelete = () => { + mutate({ + dagId: xcom.dag_id, + dagRunId: xcom.run_id, + mapIndex: xcom.map_index, + taskId: xcom.task_id, + xcomKey: xcom.key, + }); + }; + + return ( + <> + + + + + + + ); +}; + +export default DeleteXComButton; diff --git a/airflow-core/src/airflow/ui/src/pages/XCom/EditXComButton.tsx b/airflow-core/src/airflow/ui/src/pages/XCom/EditXComButton.tsx new file mode 100644 index 0000000000000..9c6c06ae44c44 --- /dev/null +++ b/airflow-core/src/airflow/ui/src/pages/XCom/EditXComButton.tsx @@ -0,0 +1,55 @@ +/*! + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +import { IconButton, useDisclosure } from "@chakra-ui/react"; +import { useTranslation } from "react-i18next"; +import { FiEdit2 } from "react-icons/fi"; + +import type { XComResponse } from "openapi/requests/types.gen"; + +import XComModal from "./XComModal"; + +type EditXComButtonProps = { + readonly xcom: XComResponse; +}; + +const EditXComButton = ({ xcom }: EditXComButtonProps) => { + const { t: translate } = useTranslation("browse"); + const { onClose, onOpen, open } = useDisclosure(); + + return ( + <> + + + + + + + ); +}; + +export default EditXComButton; diff --git a/airflow-core/src/airflow/ui/src/pages/XCom/XCom.tsx b/airflow-core/src/airflow/ui/src/pages/XCom/XCom.tsx index 6a7c0c80a20b0..90571e3854a50 100644 --- a/airflow-core/src/airflow/ui/src/pages/XCom/XCom.tsx +++ b/airflow-core/src/airflow/ui/src/pages/XCom/XCom.tsx @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -import { Box, Heading, Link, Flex, useDisclosure } from "@chakra-ui/react"; +import { Box, Flex, Heading, Link, useDisclosure } from "@chakra-ui/react"; import type { ColumnDef } from "@tanstack/react-table"; import { useMemo } from "react"; import { useTranslation } from "react-i18next"; @@ -33,6 +33,9 @@ import { TruncatedText } from "src/components/TruncatedText"; import { SearchParamsKeys, type SearchParamsKeysType } from "src/constants/searchParams"; import { getTaskInstanceLink } from "src/utils/links"; +import AddXComButton from "./AddXComButton"; +import DeleteXComButton from "./DeleteXComButton"; +import EditXComButton from "./EditXComButton"; import { XComEntry } from "./XComEntry"; import { XComFilters } from "./XComFilters"; @@ -44,7 +47,12 @@ const { TASK_ID_PATTERN: TASK_ID_PATTERN_PARAM, }: SearchParamsKeysType = SearchParamsKeys; -const columns = (translate: (key: string) => string, open: boolean): Array> => [ +type ColumnsProps = { + readonly open: boolean; + readonly translate: (key: string) => string; +}; + +const columns = ({ open, translate }: ColumnsProps): Array> => [ { accessorKey: "key", enableSorting: false, @@ -116,6 +124,17 @@ const columns = (translate: (key: string) => string, open: boolean): Array ( + + + + + ), + enableSorting: false, + header: "", + }, ]; export const XCom = () => { @@ -162,7 +181,16 @@ export const XCom = () => { const { data, error, isFetching, isLoading } = useXcomServiceGetXcomEntries(apiParams, undefined); - const memoizedColumns = useMemo(() => columns(translate, open), [translate, open]); + const memoizedColumns = useMemo( + () => + columns({ + open, + translate, + }), + [open, translate], + ); + + const isTaskInstancePage = dagId !== "~" && runId !== "~" && taskId !== "~"; return ( @@ -172,12 +200,22 @@ export const XCom = () => { - + + {isTaskInstancePage ? ( + + ) : undefined} + + diff --git a/airflow-core/src/airflow/ui/src/pages/XCom/XComModal.tsx b/airflow-core/src/airflow/ui/src/pages/XCom/XComModal.tsx new file mode 100644 index 0000000000000..1dfbacd39d79d --- /dev/null +++ b/airflow-core/src/airflow/ui/src/pages/XCom/XComModal.tsx @@ -0,0 +1,213 @@ +/*! + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +import { Box, Heading, Input, Text, VStack } from "@chakra-ui/react"; +import { useQueryClient } from "@tanstack/react-query"; +import { useEffect, useState } from "react"; +import { useTranslation } from "react-i18next"; + +import { + useXcomServiceCreateXcomEntry, + useXcomServiceGetXcomEntriesKey, + useXcomServiceGetXcomEntry, + useXcomServiceGetXcomEntryKey, + useXcomServiceUpdateXcomEntry, +} from "openapi/queries"; +import type { XComResponseNative } from "openapi/requests/types.gen"; +import { JsonEditor } from "src/components/JsonEditor"; +import { Button, Dialog, ProgressBar, toaster } from "src/components/ui"; + +type XComModalProps = { + readonly dagId: string; + readonly isOpen: boolean; + readonly mapIndex: number; + readonly mode: "add" | "edit"; + readonly onClose: () => void; + readonly runId: string; + readonly taskId: string; + readonly xcomKey?: string; +}; + +const XComModal = ({ dagId, isOpen, mapIndex, mode, onClose, runId, taskId, xcomKey }: XComModalProps) => { + const { t: translate } = useTranslation(["browse", "common"]); + const queryClient = useQueryClient(); + const [key, setKey] = useState(""); + const [value, setValue] = useState(""); + + const isEditMode = mode === "edit"; + + // Fetch existing XCom data when in edit mode + const { data, isLoading } = useXcomServiceGetXcomEntry( + { + dagId, + dagRunId: runId, + deserialize: true, + mapIndex, + taskId, + xcomKey: xcomKey ?? "", + }, + undefined, + { enabled: isOpen && isEditMode && Boolean(xcomKey) }, + ); + + // Populate form when editing + useEffect(() => { + if (isEditMode && data?.value !== undefined) { + const val = data.value; + + setValue(typeof val === "string" ? val : JSON.stringify(val, undefined, 2)); + } + }, [data, isEditMode]); + + // Reset form when modal closes + useEffect(() => { + if (!isOpen) { + setKey(""); + setValue(""); + } + }, [isOpen]); + + // Create mutation + const { isPending: isCreating, mutate: createXCom } = useXcomServiceCreateXcomEntry({ + onError: () => { + toaster.create({ + description: translate("browse:xcom.add.error"), + title: translate("browse:xcom.add.errorTitle"), + type: "error", + }); + }, + onSuccess: async () => { + await queryClient.invalidateQueries({ + queryKey: [useXcomServiceGetXcomEntriesKey], + }); + onClose(); + toaster.create({ + description: translate("browse:xcom.add.success"), + title: translate("browse:xcom.add.successTitle"), + type: "success", + }); + }, + }); + + // Update mutation + const { isPending: isUpdating, mutate: updateXCom } = useXcomServiceUpdateXcomEntry({ + onError: () => { + toaster.create({ + description: translate("browse:xcom.edit.error"), + title: translate("browse:xcom.edit.errorTitle"), + type: "error", + }); + }, + onSuccess: async () => { + await queryClient.invalidateQueries({ + queryKey: [useXcomServiceGetXcomEntriesKey], + }); + await queryClient.invalidateQueries({ + queryKey: [useXcomServiceGetXcomEntryKey], + }); + onClose(); + toaster.create({ + description: translate("browse:xcom.edit.success"), + title: translate("browse:xcom.edit.successTitle"), + type: "success", + }); + }, + }); + + const onSave = () => { + let parsedValue: unknown = value; + + try { + parsedValue = JSON.parse(value) as unknown; + } catch { + // use string + } + + if (isEditMode && xcomKey !== undefined) { + updateXCom({ + dagId, + dagRunId: runId, + requestBody: { + map_index: mapIndex, + value: parsedValue, + }, + taskId, + xcomKey, + }); + } else { + createXCom({ + dagId, + dagRunId: runId, + requestBody: { + key, + map_index: mapIndex, + value: parsedValue, + }, + taskId, + }); + } + }; + + const isPending = isCreating || isUpdating; + const title = isEditMode ? translate("browse:xcom.edit.title") : translate("browse:xcom.add.title"); + + return ( + + + + {title} + + + + {isLoading ? ( + + ) : ( + + + + {translate("browse:xcom.key")} + + {isEditMode ? ( + {xcomKey} + ) : ( + setKey(event.target.value)} value={key} /> + )} + + + + {translate("browse:xcom.value")} + + setValue(val)} value={value} /> + + + )} + + + + + + + + ); +}; + +export default XComModal; diff --git a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_xcom.py b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_xcom.py index 39d47aa06461a..ccfceb8c4526f 100644 --- a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_xcom.py +++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_xcom.py @@ -568,6 +568,36 @@ class TestCreateXComEntry(TestXComEndpoint): None, id="valid-xcom-entry", ), + # Test case: Valid input with string value + pytest.param( + TEST_DAG_ID, + TEST_TASK_ID, + run_id, + XComCreateBody(key="string_key", value="simple string"), + 201, + None, + id="valid-xcom-with-string-value", + ), + # Test case: Valid input with integer value + pytest.param( + TEST_DAG_ID, + TEST_TASK_ID, + run_id, + XComCreateBody(key="int_key", value=42), + 201, + None, + id="valid-xcom-with-integer-value", + ), + # Test case: Valid input with list value + pytest.param( + TEST_DAG_ID, + TEST_TASK_ID, + run_id, + XComCreateBody(key="list_key", value=[1, 2, 3]), + 201, + None, + id="valid-xcom-with-list-value", + ), # Test case: DAG not found pytest.param( "invalid-dag-id", @@ -676,6 +706,42 @@ def test_create_xcom_entry_with_slash_key(self, test_client): assert get_resp.json()["value"] == json.dumps(TEST_XCOM_VALUE) +class TestDeleteXComEntry(TestXComEndpoint): + def test_delete_xcom_entry(self, test_client, session): + self._create_xcom(TEST_XCOM_KEY, TEST_XCOM_VALUE) + + response = test_client.delete( + f"/dags/{TEST_DAG_ID}/dagRuns/{run_id}/taskInstances/{TEST_TASK_ID}/xcomEntries/{TEST_XCOM_KEY}" + ) + + assert response.status_code == 204 + check_last_log(session, dag_id=TEST_DAG_ID, event="delete_xcom_entry", logical_date=None) + + # Verify it's gone + response = test_client.get( + f"/dags/{TEST_DAG_ID}/dagRuns/{run_id}/taskInstances/{TEST_TASK_ID}/xcomEntries/{TEST_XCOM_KEY}" + ) + assert response.status_code == 404 + + def test_delete_xcom_entry_not_found(self, test_client): + response = test_client.delete( + f"/dags/{TEST_DAG_ID}/dagRuns/{run_id}/taskInstances/{TEST_TASK_ID}/xcomEntries/{TEST_XCOM_KEY}" + ) + assert response.status_code == 404 + + def test_should_respond_401(self, unauthenticated_test_client): + response = unauthenticated_test_client.delete( + f"/dags/{TEST_DAG_ID}/dagRuns/{run_id}/taskInstances/{TEST_TASK_ID}/xcomEntries/{TEST_XCOM_KEY}" + ) + assert response.status_code == 401 + + def test_should_respond_403(self, unauthorized_test_client): + response = unauthorized_test_client.delete( + f"/dags/{TEST_DAG_ID}/dagRuns/{run_id}/taskInstances/{TEST_TASK_ID}/xcomEntries/{TEST_XCOM_KEY}" + ) + assert response.status_code == 403 + + class TestPatchXComEntry(TestXComEndpoint): @pytest.mark.parametrize( ("key", "patch_body", "expected_status", "expected_detail"), @@ -688,6 +754,30 @@ class TestPatchXComEntry(TestXComEndpoint): None, id="valid-xcom-update", ), + # Test case: Update with complex object + pytest.param( + TEST_XCOM_KEY, + {"value": {"nested": {"key": "value"}, "list": [1, 2, 3]}}, + 200, + None, + id="valid-xcom-update-complex-object", + ), + # Test case: Update with integer + pytest.param( + TEST_XCOM_KEY, + {"value": 999}, + 200, + None, + id="valid-xcom-update-integer", + ), + # Test case: Update with list + pytest.param( + TEST_XCOM_KEY, + {"value": ["updated", "list", "values"]}, + 200, + None, + id="valid-xcom-update-list", + ), # Test case: XCom entry does not exist, should return 404 pytest.param( TEST_XCOM_KEY, @@ -702,8 +792,6 @@ def test_patch_xcom_entry(self, key, patch_body, expected_status, expected_detai # Ensure the XCom entry exists before updating if expected_status != 404: self._create_xcom(TEST_XCOM_KEY, TEST_XCOM_VALUE) - # The value is double-serialized: first json.dumps(patch_body["value"]), then json.dumps() again - new_value = json.dumps(json.dumps(patch_body["value"])) response = test_client.patch( f"/dags/{TEST_DAG_ID}/dagRuns/{run_id}/taskInstances/{TEST_TASK_ID}/xcomEntries/{key}", @@ -713,7 +801,7 @@ def test_patch_xcom_entry(self, key, patch_body, expected_status, expected_detai assert response.status_code == expected_status if expected_status == 200: - assert response.json()["value"] == new_value + assert response.json()["value"] == json.dumps(patch_body["value"]) else: assert response.json()["detail"] == expected_detail check_last_log(session, dag_id=TEST_DAG_ID, event="update_xcom_entry", logical_date=None) @@ -742,6 +830,5 @@ def test_patch_xcom_entry_with_slash_key(self, test_client, session): ) assert response.status_code == 200 assert response.json()["key"] == slash_key - # The value is double-serialized: first json.dumps(new_value), then json.dumps() again - assert response.json()["value"] == json.dumps(json.dumps(new_value)) + assert response.json()["value"] == json.dumps(new_value) check_last_log(session, dag_id=TEST_DAG_ID, event="update_xcom_entry", logical_date=None)