diff --git a/airflow-core/src/airflow/api_fastapi/common/parameters.py b/airflow-core/src/airflow/api_fastapi/common/parameters.py index 46f8eef89a029..4e98d028aa815 100644 --- a/airflow-core/src/airflow/api_fastapi/common/parameters.py +++ b/airflow-core/src/airflow/api_fastapi/common/parameters.py @@ -57,6 +57,7 @@ from airflow.models.pool import Pool from airflow.models.taskinstance import TaskInstance from airflow.models.variable import Variable +from airflow.models.xcom import XComModel from airflow.typing_compat import Self from airflow.utils.state import DagRunState, TaskInstanceState from airflow.utils.types import DagRunType @@ -733,6 +734,21 @@ def _transform_ti_states(states: list[str] | None) -> list[TaskInstanceState | N ), ] +# XCom +QueryXComKeyPatternSearch = Annotated[ + _SearchParam, Depends(search_param_factory(XComModel.key, "xcom_key_pattern")) +] + +QueryXComDagDisplayNamePatternSearch = Annotated[ + _SearchParam, Depends(search_param_factory(DagModel.dag_display_name, "dag_display_name_pattern")) +] +QueryXComRunIdPatternSearch = Annotated[ + _SearchParam, Depends(search_param_factory(XComModel.run_id, "run_id_pattern")) +] +QueryXComTaskIdPatternSearch = Annotated[ + _SearchParam, Depends(search_param_factory(XComModel.task_id, "task_id_pattern")) +] + # Assets QueryAssetNamePatternSearch = Annotated[ _SearchParam, Depends(search_param_factory(AssetModel.name, "name_pattern")) diff --git a/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml b/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml index f42e1846bdfd3..ac565cec76c39 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml +++ b/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml @@ -4667,6 +4667,98 @@ paths: minimum: 0 default: 0 title: Offset + - name: xcom_key_pattern + in: query + required: false + schema: + anyOf: + - type: string + - type: 'null' + description: "SQL LIKE expression \u2014 use `%` / `_` wildcards (e.g. `%customer_%`).\ + \ Regular expressions are **not** supported." + title: Xcom Key Pattern + description: "SQL LIKE expression \u2014 use `%` / `_` wildcards (e.g. `%customer_%`).\ + \ Regular expressions are **not** supported." + - name: dag_display_name_pattern + in: query + required: false + schema: + anyOf: + - type: string + - type: 'null' + description: "SQL LIKE expression \u2014 use `%` / `_` wildcards (e.g. `%customer_%`).\ + \ Regular expressions are **not** supported." + title: Dag Display Name Pattern + description: "SQL LIKE expression \u2014 use `%` / `_` wildcards (e.g. `%customer_%`).\ + \ Regular expressions are **not** supported." + - name: run_id_pattern + in: query + required: false + schema: + anyOf: + - type: string + - type: 'null' + description: "SQL LIKE expression \u2014 use `%` / `_` wildcards (e.g. `%customer_%`).\ + \ Regular expressions are **not** supported." + title: Run Id Pattern + description: "SQL LIKE expression \u2014 use `%` / `_` wildcards (e.g. `%customer_%`).\ + \ Regular expressions are **not** supported." + - name: task_id_pattern + in: query + required: false + schema: + anyOf: + - type: string + - type: 'null' + description: "SQL LIKE expression \u2014 use `%` / `_` wildcards (e.g. `%customer_%`).\ + \ Regular expressions are **not** supported." + title: Task Id Pattern + description: "SQL LIKE expression \u2014 use `%` / `_` wildcards (e.g. `%customer_%`).\ + \ Regular expressions are **not** supported." + - name: map_index_filter + in: query + required: false + schema: + anyOf: + - type: integer + - type: 'null' + title: Map Index Filter + - name: logical_date_gte + in: query + required: false + schema: + anyOf: + - type: string + format: date-time + - type: 'null' + title: Logical Date Gte + - name: logical_date_lte + in: query + required: false + schema: + anyOf: + - type: string + format: date-time + - type: 'null' + title: Logical Date Lte + - name: run_after_gte + in: query + required: false + schema: + anyOf: + - type: string + format: date-time + - type: 'null' + title: Run After Gte + - name: run_after_lte + in: query + required: false + schema: + anyOf: + - type: string + format: date-time + - type: 'null' + title: Run After Lte responses: '200': description: Successful Response diff --git a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/xcom.py b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/xcom.py index d3c2721c161e9..2cdb89fcc88d9 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/xcom.py +++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/xcom.py @@ -26,7 +26,18 @@ from airflow.api_fastapi.auth.managers.models.resource_details import DagAccessEntity from airflow.api_fastapi.common.dagbag import DagBagDep, get_dag_for_run_or_latest_version from airflow.api_fastapi.common.db.common import SessionDep, paginated_select -from airflow.api_fastapi.common.parameters import QueryLimit, QueryOffset +from airflow.api_fastapi.common.parameters import ( + FilterParam, + QueryLimit, + QueryOffset, + QueryXComDagDisplayNamePatternSearch, + QueryXComKeyPatternSearch, + QueryXComRunIdPatternSearch, + QueryXComTaskIdPatternSearch, + RangeFilter, + datetime_range_filter_factory, + filter_param_factory, +) from airflow.api_fastapi.common.router import AirflowRouter from airflow.api_fastapi.core_api.datamodels.xcom import ( XComCollectionResponse, @@ -40,6 +51,7 @@ from airflow.api_fastapi.logging.decorators import action_logging from airflow.exceptions import TaskNotFound from airflow.models import DagRun as DR +from airflow.models.dag import DagModel from airflow.models.xcom import XComModel xcom_router = AirflowRouter( @@ -127,6 +139,16 @@ def get_xcom_entries( offset: QueryOffset, readable_xcom_filter: ReadableXComFilterDep, session: SessionDep, + xcom_key_pattern: QueryXComKeyPatternSearch, + dag_display_name_pattern: QueryXComDagDisplayNamePatternSearch, + run_id_pattern: QueryXComRunIdPatternSearch, + task_id_pattern: QueryXComTaskIdPatternSearch, + map_index_filter: Annotated[ + FilterParam[int | None], + Depends(filter_param_factory(XComModel.map_index, int | None, filter_name="map_index_filter")), + ], + logical_date_range: Annotated[RangeFilter, Depends(datetime_range_filter_factory("logical_date", DR))], + run_after_range: Annotated[RangeFilter, Depends(datetime_range_filter_factory("run_after", DR))], xcom_key: Annotated[str | None, Query()] = None, map_index: Annotated[int | None, Query(ge=-1)] = None, ) -> XComCollectionResponse: @@ -138,8 +160,10 @@ def get_xcom_entries( query = select(XComModel) if dag_id != "~": query = query.where(XComModel.dag_id == dag_id) - query = query.join(DR, and_(XComModel.dag_id == DR.dag_id, XComModel.run_id == DR.run_id)).options( - joinedload(XComModel.dag_run).joinedload(DR.dag_model) + query = ( + query.join(DR, and_(XComModel.dag_id == DR.dag_id, XComModel.run_id == DR.run_id)) + .join(DagModel, DR.dag_id == DagModel.dag_id) + .options(joinedload(XComModel.dag_run).joinedload(DR.dag_model)) ) if task_id != "~": @@ -153,7 +177,16 @@ def get_xcom_entries( query, total_entries = paginated_select( statement=query, - filters=[readable_xcom_filter], + filters=[ + readable_xcom_filter, + xcom_key_pattern, + dag_display_name_pattern, + run_id_pattern, + task_id_pattern, + map_index_filter, + logical_date_range, + run_after_range, + ], offset=offset, limit=limit, session=session, diff --git a/airflow-core/src/airflow/ui/openapi-gen/queries/common.ts b/airflow-core/src/airflow/ui/openapi-gen/queries/common.ts index 65a87575ad64e..1c7ded5f8f422 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/queries/common.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/queries/common.ts @@ -574,15 +574,24 @@ export const UseXcomServiceGetXcomEntryKeyFn = ({ dagId, dagRunId, deserialize, export type XcomServiceGetXcomEntriesDefaultResponse = Awaited>; export type XcomServiceGetXcomEntriesQueryResult = UseQueryResult; export const useXcomServiceGetXcomEntriesKey = "XcomServiceGetXcomEntries"; -export const UseXcomServiceGetXcomEntriesKeyFn = ({ dagId, dagRunId, limit, mapIndex, offset, taskId, xcomKey }: { +export const UseXcomServiceGetXcomEntriesKeyFn = ({ dagDisplayNamePattern, dagId, dagRunId, limit, logicalDateGte, logicalDateLte, mapIndex, mapIndexFilter, offset, runAfterGte, runAfterLte, runIdPattern, taskId, taskIdPattern, xcomKey, xcomKeyPattern }: { + dagDisplayNamePattern?: string; dagId: string; dagRunId: string; limit?: number; + logicalDateGte?: string; + logicalDateLte?: string; mapIndex?: number; + mapIndexFilter?: number; offset?: number; + runAfterGte?: string; + runAfterLte?: string; + runIdPattern?: string; taskId: string; + taskIdPattern?: string; xcomKey?: string; -}, queryKey?: Array) => [useXcomServiceGetXcomEntriesKey, ...(queryKey ?? [{ dagId, dagRunId, limit, mapIndex, offset, taskId, xcomKey }])]; + xcomKeyPattern?: string; +}, queryKey?: Array) => [useXcomServiceGetXcomEntriesKey, ...(queryKey ?? [{ dagDisplayNamePattern, dagId, dagRunId, limit, logicalDateGte, logicalDateLte, mapIndex, mapIndexFilter, offset, runAfterGte, runAfterLte, runIdPattern, taskId, taskIdPattern, xcomKey, xcomKeyPattern }])]; export type TaskServiceGetTasksDefaultResponse = Awaited>; export type TaskServiceGetTasksQueryResult = UseQueryResult; export const useTaskServiceGetTasksKey = "TaskServiceGetTasks"; diff --git a/airflow-core/src/airflow/ui/openapi-gen/queries/ensureQueryData.ts b/airflow-core/src/airflow/ui/openapi-gen/queries/ensureQueryData.ts index 7a18c205ce06b..7586b0167bed6 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/queries/ensureQueryData.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/queries/ensureQueryData.ts @@ -1088,18 +1088,36 @@ export const ensureUseXcomServiceGetXcomEntryData = (queryClient: QueryClient, { * @param data.mapIndex * @param data.limit * @param data.offset +* @param data.xcomKeyPattern SQL LIKE expression — use `%` / `_` wildcards (e.g. `%customer_%`). Regular expressions are **not** supported. +* @param data.dagDisplayNamePattern SQL LIKE expression — use `%` / `_` wildcards (e.g. `%customer_%`). Regular expressions are **not** supported. +* @param data.runIdPattern SQL LIKE expression — use `%` / `_` wildcards (e.g. `%customer_%`). Regular expressions are **not** supported. +* @param data.taskIdPattern SQL LIKE expression — use `%` / `_` wildcards (e.g. `%customer_%`). Regular expressions are **not** supported. +* @param data.mapIndexFilter +* @param data.logicalDateGte +* @param data.logicalDateLte +* @param data.runAfterGte +* @param data.runAfterLte * @returns XComCollectionResponse Successful Response * @throws ApiError */ -export const ensureUseXcomServiceGetXcomEntriesData = (queryClient: QueryClient, { dagId, dagRunId, limit, mapIndex, offset, taskId, xcomKey }: { +export const ensureUseXcomServiceGetXcomEntriesData = (queryClient: QueryClient, { dagDisplayNamePattern, dagId, dagRunId, limit, logicalDateGte, logicalDateLte, mapIndex, mapIndexFilter, offset, runAfterGte, runAfterLte, runIdPattern, taskId, taskIdPattern, xcomKey, xcomKeyPattern }: { + dagDisplayNamePattern?: string; dagId: string; dagRunId: string; limit?: number; + logicalDateGte?: string; + logicalDateLte?: string; mapIndex?: number; + mapIndexFilter?: number; offset?: number; + runAfterGte?: string; + runAfterLte?: string; + runIdPattern?: string; taskId: string; + taskIdPattern?: string; xcomKey?: string; -}) => queryClient.ensureQueryData({ queryKey: Common.UseXcomServiceGetXcomEntriesKeyFn({ dagId, dagRunId, limit, mapIndex, offset, taskId, xcomKey }), queryFn: () => XcomService.getXcomEntries({ dagId, dagRunId, limit, mapIndex, offset, taskId, xcomKey }) }); + xcomKeyPattern?: string; +}) => queryClient.ensureQueryData({ queryKey: Common.UseXcomServiceGetXcomEntriesKeyFn({ dagDisplayNamePattern, dagId, dagRunId, limit, logicalDateGte, logicalDateLte, mapIndex, mapIndexFilter, offset, runAfterGte, runAfterLte, runIdPattern, taskId, taskIdPattern, xcomKey, xcomKeyPattern }), queryFn: () => XcomService.getXcomEntries({ dagDisplayNamePattern, dagId, dagRunId, limit, logicalDateGte, logicalDateLte, mapIndex, mapIndexFilter, offset, runAfterGte, runAfterLte, runIdPattern, taskId, taskIdPattern, xcomKey, xcomKeyPattern }) }); /** * Get Tasks * Get tasks for DAG. diff --git a/airflow-core/src/airflow/ui/openapi-gen/queries/prefetch.ts b/airflow-core/src/airflow/ui/openapi-gen/queries/prefetch.ts index d4df09ab1c730..68efa90db53f9 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/queries/prefetch.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/queries/prefetch.ts @@ -1088,18 +1088,36 @@ export const prefetchUseXcomServiceGetXcomEntry = (queryClient: QueryClient, { d * @param data.mapIndex * @param data.limit * @param data.offset +* @param data.xcomKeyPattern SQL LIKE expression — use `%` / `_` wildcards (e.g. `%customer_%`). Regular expressions are **not** supported. +* @param data.dagDisplayNamePattern SQL LIKE expression — use `%` / `_` wildcards (e.g. `%customer_%`). Regular expressions are **not** supported. +* @param data.runIdPattern SQL LIKE expression — use `%` / `_` wildcards (e.g. `%customer_%`). Regular expressions are **not** supported. +* @param data.taskIdPattern SQL LIKE expression — use `%` / `_` wildcards (e.g. `%customer_%`). Regular expressions are **not** supported. +* @param data.mapIndexFilter +* @param data.logicalDateGte +* @param data.logicalDateLte +* @param data.runAfterGte +* @param data.runAfterLte * @returns XComCollectionResponse Successful Response * @throws ApiError */ -export const prefetchUseXcomServiceGetXcomEntries = (queryClient: QueryClient, { dagId, dagRunId, limit, mapIndex, offset, taskId, xcomKey }: { +export const prefetchUseXcomServiceGetXcomEntries = (queryClient: QueryClient, { dagDisplayNamePattern, dagId, dagRunId, limit, logicalDateGte, logicalDateLte, mapIndex, mapIndexFilter, offset, runAfterGte, runAfterLte, runIdPattern, taskId, taskIdPattern, xcomKey, xcomKeyPattern }: { + dagDisplayNamePattern?: string; dagId: string; dagRunId: string; limit?: number; + logicalDateGte?: string; + logicalDateLte?: string; mapIndex?: number; + mapIndexFilter?: number; offset?: number; + runAfterGte?: string; + runAfterLte?: string; + runIdPattern?: string; taskId: string; + taskIdPattern?: string; xcomKey?: string; -}) => queryClient.prefetchQuery({ queryKey: Common.UseXcomServiceGetXcomEntriesKeyFn({ dagId, dagRunId, limit, mapIndex, offset, taskId, xcomKey }), queryFn: () => XcomService.getXcomEntries({ dagId, dagRunId, limit, mapIndex, offset, taskId, xcomKey }) }); + xcomKeyPattern?: string; +}) => queryClient.prefetchQuery({ queryKey: Common.UseXcomServiceGetXcomEntriesKeyFn({ dagDisplayNamePattern, dagId, dagRunId, limit, logicalDateGte, logicalDateLte, mapIndex, mapIndexFilter, offset, runAfterGte, runAfterLte, runIdPattern, taskId, taskIdPattern, xcomKey, xcomKeyPattern }), queryFn: () => XcomService.getXcomEntries({ dagDisplayNamePattern, dagId, dagRunId, limit, logicalDateGte, logicalDateLte, mapIndex, mapIndexFilter, offset, runAfterGte, runAfterLte, runIdPattern, taskId, taskIdPattern, xcomKey, xcomKeyPattern }) }); /** * Get Tasks * Get tasks for DAG. diff --git a/airflow-core/src/airflow/ui/openapi-gen/queries/queries.ts b/airflow-core/src/airflow/ui/openapi-gen/queries/queries.ts index f5b65f7596b10..3557a30f759c4 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/queries/queries.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/queries/queries.ts @@ -1088,18 +1088,36 @@ export const useXcomServiceGetXcomEntry = = unknown[]>({ dagId, dagRunId, limit, mapIndex, offset, taskId, xcomKey }: { +export const useXcomServiceGetXcomEntries = = unknown[]>({ dagDisplayNamePattern, dagId, dagRunId, limit, logicalDateGte, logicalDateLte, mapIndex, mapIndexFilter, offset, runAfterGte, runAfterLte, runIdPattern, taskId, taskIdPattern, xcomKey, xcomKeyPattern }: { + dagDisplayNamePattern?: string; dagId: string; dagRunId: string; limit?: number; + logicalDateGte?: string; + logicalDateLte?: string; mapIndex?: number; + mapIndexFilter?: number; offset?: number; + runAfterGte?: string; + runAfterLte?: string; + runIdPattern?: string; taskId: string; + taskIdPattern?: string; xcomKey?: string; -}, queryKey?: TQueryKey, options?: Omit, "queryKey" | "queryFn">) => useQuery({ queryKey: Common.UseXcomServiceGetXcomEntriesKeyFn({ dagId, dagRunId, limit, mapIndex, offset, taskId, xcomKey }, queryKey), queryFn: () => XcomService.getXcomEntries({ dagId, dagRunId, limit, mapIndex, offset, taskId, xcomKey }) as TData, ...options }); + xcomKeyPattern?: string; +}, queryKey?: TQueryKey, options?: Omit, "queryKey" | "queryFn">) => useQuery({ queryKey: Common.UseXcomServiceGetXcomEntriesKeyFn({ dagDisplayNamePattern, dagId, dagRunId, limit, logicalDateGte, logicalDateLte, mapIndex, mapIndexFilter, offset, runAfterGte, runAfterLte, runIdPattern, taskId, taskIdPattern, xcomKey, xcomKeyPattern }, queryKey), queryFn: () => XcomService.getXcomEntries({ dagDisplayNamePattern, dagId, dagRunId, limit, logicalDateGte, logicalDateLte, mapIndex, mapIndexFilter, offset, runAfterGte, runAfterLte, runIdPattern, taskId, taskIdPattern, xcomKey, xcomKeyPattern }) as TData, ...options }); /** * Get Tasks * Get tasks for DAG. diff --git a/airflow-core/src/airflow/ui/openapi-gen/queries/suspense.ts b/airflow-core/src/airflow/ui/openapi-gen/queries/suspense.ts index d2b2ee9fcd6bd..f3e532afa4c14 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/queries/suspense.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/queries/suspense.ts @@ -1088,18 +1088,36 @@ export const useXcomServiceGetXcomEntrySuspense = = unknown[]>({ dagId, dagRunId, limit, mapIndex, offset, taskId, xcomKey }: { +export const useXcomServiceGetXcomEntriesSuspense = = unknown[]>({ dagDisplayNamePattern, dagId, dagRunId, limit, logicalDateGte, logicalDateLte, mapIndex, mapIndexFilter, offset, runAfterGte, runAfterLte, runIdPattern, taskId, taskIdPattern, xcomKey, xcomKeyPattern }: { + dagDisplayNamePattern?: string; dagId: string; dagRunId: string; limit?: number; + logicalDateGte?: string; + logicalDateLte?: string; mapIndex?: number; + mapIndexFilter?: number; offset?: number; + runAfterGte?: string; + runAfterLte?: string; + runIdPattern?: string; taskId: string; + taskIdPattern?: string; xcomKey?: string; -}, queryKey?: TQueryKey, options?: Omit, "queryKey" | "queryFn">) => useSuspenseQuery({ queryKey: Common.UseXcomServiceGetXcomEntriesKeyFn({ dagId, dagRunId, limit, mapIndex, offset, taskId, xcomKey }, queryKey), queryFn: () => XcomService.getXcomEntries({ dagId, dagRunId, limit, mapIndex, offset, taskId, xcomKey }) as TData, ...options }); + xcomKeyPattern?: string; +}, queryKey?: TQueryKey, options?: Omit, "queryKey" | "queryFn">) => useSuspenseQuery({ queryKey: Common.UseXcomServiceGetXcomEntriesKeyFn({ dagDisplayNamePattern, dagId, dagRunId, limit, logicalDateGte, logicalDateLte, mapIndex, mapIndexFilter, offset, runAfterGte, runAfterLte, runIdPattern, taskId, taskIdPattern, xcomKey, xcomKeyPattern }, queryKey), queryFn: () => XcomService.getXcomEntries({ dagDisplayNamePattern, dagId, dagRunId, limit, logicalDateGte, logicalDateLte, mapIndex, mapIndexFilter, offset, runAfterGte, runAfterLte, runIdPattern, taskId, taskIdPattern, xcomKey, xcomKeyPattern }) as TData, ...options }); /** * Get Tasks * Get tasks for DAG. diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/services.gen.ts b/airflow-core/src/airflow/ui/openapi-gen/requests/services.gen.ts index 3ddb842f32238..d23551154d52c 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/requests/services.gen.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/requests/services.gen.ts @@ -3018,6 +3018,15 @@ export class XcomService { * @param data.mapIndex * @param data.limit * @param data.offset + * @param data.xcomKeyPattern SQL LIKE expression — use `%` / `_` wildcards (e.g. `%customer_%`). Regular expressions are **not** supported. + * @param data.dagDisplayNamePattern SQL LIKE expression — use `%` / `_` wildcards (e.g. `%customer_%`). Regular expressions are **not** supported. + * @param data.runIdPattern SQL LIKE expression — use `%` / `_` wildcards (e.g. `%customer_%`). Regular expressions are **not** supported. + * @param data.taskIdPattern SQL LIKE expression — use `%` / `_` wildcards (e.g. `%customer_%`). Regular expressions are **not** supported. + * @param data.mapIndexFilter + * @param data.logicalDateGte + * @param data.logicalDateLte + * @param data.runAfterGte + * @param data.runAfterLte * @returns XComCollectionResponse Successful Response * @throws ApiError */ @@ -3034,7 +3043,16 @@ export class XcomService { xcom_key: data.xcomKey, map_index: data.mapIndex, limit: data.limit, - offset: data.offset + offset: data.offset, + xcom_key_pattern: data.xcomKeyPattern, + dag_display_name_pattern: data.dagDisplayNamePattern, + run_id_pattern: data.runIdPattern, + task_id_pattern: data.taskIdPattern, + map_index_filter: data.mapIndexFilter, + logical_date_gte: data.logicalDateGte, + logical_date_lte: data.logicalDateLte, + run_after_gte: data.runAfterGte, + run_after_lte: data.runAfterLte }, errors: { 400: 'Bad Request', diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts b/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts index b0f1a6e5d27ac..b082d41943d53 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts @@ -2810,13 +2810,34 @@ export type UpdateXcomEntryData = { export type UpdateXcomEntryResponse = XComResponseNative; export type GetXcomEntriesData = { + /** + * SQL LIKE expression — use `%` / `_` wildcards (e.g. `%customer_%`). Regular expressions are **not** supported. + */ + dagDisplayNamePattern?: string | null; dagId: string; dagRunId: string; limit?: number; + logicalDateGte?: string | null; + logicalDateLte?: string | null; mapIndex?: number | null; + mapIndexFilter?: number | null; offset?: number; + runAfterGte?: string | null; + runAfterLte?: string | null; + /** + * SQL LIKE expression — use `%` / `_` wildcards (e.g. `%customer_%`). Regular expressions are **not** supported. + */ + runIdPattern?: string | null; taskId: string; + /** + * SQL LIKE expression — use `%` / `_` wildcards (e.g. `%customer_%`). Regular expressions are **not** supported. + */ + taskIdPattern?: string | null; xcomKey?: string | null; + /** + * SQL LIKE expression — use `%` / `_` wildcards (e.g. `%customer_%`). Regular expressions are **not** supported. + */ + xcomKeyPattern?: string | null; }; export type GetXcomEntriesResponse = XComCollectionResponse; diff --git a/airflow-core/src/airflow/ui/public/i18n/locales/en/common.json b/airflow-core/src/airflow/ui/public/i18n/locales/en/common.json index 8dda576063e0b..74950ad83f572 100644 --- a/airflow-core/src/airflow/ui/public/i18n/locales/en/common.json +++ b/airflow-core/src/airflow/ui/public/i18n/locales/en/common.json @@ -92,6 +92,17 @@ "any": "Any", "or": "OR" }, + "filters": { + "dagDisplayNamePlaceholder": "Filter by Dag", + "keyPlaceholder": "Filter by XCom key", + "logicalDateFromPlaceholder": "Logical Date From", + "logicalDateToPlaceholder": "Logical Date To", + "mapIndexPlaceholder": "Filter by Map Index", + "runAfterFromPlaceholder": "Run After From", + "runAfterToPlaceholder": "Run After To", + "runIdPlaceholder": "Filter by Run ID", + "taskIdPlaceholder": "Filter by Task ID" + }, "logicalDate": "Logical Date", "logout": "Logout", "logoutConfirmation": "You are about to logout from the application.", diff --git a/airflow-core/src/airflow/ui/public/i18n/locales/zh-TW/common.json b/airflow-core/src/airflow/ui/public/i18n/locales/zh-TW/common.json index 1423588781385..62dce303ca608 100644 --- a/airflow-core/src/airflow/ui/public/i18n/locales/zh-TW/common.json +++ b/airflow-core/src/airflow/ui/public/i18n/locales/zh-TW/common.json @@ -92,6 +92,16 @@ "any": "任何", "or": "或" }, + "filters": { + "dagDisplayNamePlaceholder": "依 Dag 名稱篩選", + "keyPlaceholder": "依 XCom 鍵篩選", + "logicalDateFromPlaceholder": "邏輯日期起始", + "logicalDateToPlaceholder": "邏輯日期結束", + "runAfterFromPlaceholder": "執行時間起始", + "runAfterToPlaceholder": "執行時間結束", + "runIdPlaceholder": "依執行 ID 篩選", + "taskIdPlaceholder": "依任務 ID 篩選" + }, "logicalDate": "邏輯日期", "logout": "登出", "logoutConfirmation": "確定要登出嗎?", diff --git a/airflow-core/src/airflow/ui/src/constants/searchParams.ts b/airflow-core/src/airflow/ui/src/constants/searchParams.ts index 91a08a332898e..d333695251df4 100644 --- a/airflow-core/src/airflow/ui/src/constants/searchParams.ts +++ b/airflow-core/src/airflow/ui/src/constants/searchParams.ts @@ -17,18 +17,26 @@ * under the License. */ export enum SearchParamsKeys { + DAG_DISPLAY_NAME_PATTERN = "dag_display_name_pattern", + DAG_ID_PATTERN = "dag_id_pattern", DEPENDENCIES = "dependencies", END_DATE = "end_date", FAVORITE = "favorite", + KEY_PATTERN = "key_pattern", LAST_DAG_RUN_STATE = "last_dag_run_state", LIMIT = "limit", LOG_LEVEL = "log_level", + LOGICAL_DATE_GTE = "logical_date_gte", + LOGICAL_DATE_LTE = "logical_date_lte", + MAP_INDEX = "map_index", NAME_PATTERN = "name_pattern", OFFSET = "offset", OWNERS = "owners", PAUSED = "paused", POOL = "pool", RESPONSE_RECEIVED = "response_received", + RUN_AFTER_GTE = "run_after_gte", + RUN_AFTER_LTE = "run_after_lte", RUN_ID_PATTERN = "run_id_pattern", RUN_TYPE = "run_type", SORT = "sort", @@ -37,6 +45,7 @@ export enum SearchParamsKeys { STATE = "state", TAGS = "tags", TAGS_MATCH_MODE = "tags_match_mode", + TASK_ID_PATTERN = "task_id_pattern", TRIGGERING_USER_NAME_PATTERN = "triggering_user_name_pattern", TRY_NUMBER = "try_number", VERSION_NUMBER = "version_number", diff --git a/airflow-core/src/airflow/ui/src/pages/XCom/XCom.tsx b/airflow-core/src/airflow/ui/src/pages/XCom/XCom.tsx index 842bd7ac7d4f0..50baab00bda0d 100644 --- a/airflow-core/src/airflow/ui/src/pages/XCom/XCom.tsx +++ b/airflow-core/src/airflow/ui/src/pages/XCom/XCom.tsx @@ -19,7 +19,7 @@ import { Box, Heading, Link } from "@chakra-ui/react"; import type { ColumnDef } from "@tanstack/react-table"; import { useTranslation } from "react-i18next"; -import { Link as RouterLink, useParams } from "react-router-dom"; +import { Link as RouterLink, useParams, useSearchParams } from "react-router-dom"; import { useXcomServiceGetXcomEntries } from "openapi/queries"; import type { XComResponse } from "openapi/requests/types.gen"; @@ -27,9 +27,19 @@ import { DataTable } from "src/components/DataTable"; import { useTableURLState } from "src/components/DataTable/useTableUrlState"; import { ErrorAlert } from "src/components/ErrorAlert"; import { TruncatedText } from "src/components/TruncatedText"; +import { SearchParamsKeys, type SearchParamsKeysType } from "src/constants/searchParams"; import { getTaskInstanceLinkFromObj } from "src/utils/links"; import { XComEntry } from "./XComEntry"; +import { XComFilters } from "./XComFilters"; + +const { + DAG_DISPLAY_NAME_PATTERN: DAG_DISPLAY_NAME_PATTERN_PARAM, + KEY_PATTERN: KEY_PATTERN_PARAM, + MAP_INDEX: MAP_INDEX_PARAM, + RUN_ID_PATTERN: RUN_ID_PATTERN_PARAM, + TASK_ID_PATTERN: TASK_ID_PATTERN_PARAM, +}: SearchParamsKeysType = SearchParamsKeys; const columns = (translate: (key: string) => string): Array> => [ { @@ -103,25 +113,55 @@ export const XCom = () => { const { t: translate } = useTranslation(["browse", "common"]); const { setTableURLState, tableURLState } = useTableURLState(); const { pagination } = tableURLState; + const [searchParams] = useSearchParams(); - const { data, error, isFetching, isLoading } = useXcomServiceGetXcomEntries( - { - dagId, - dagRunId: runId, - limit: pagination.pageSize, - mapIndex: mapIndex === "-1" ? undefined : parseInt(mapIndex, 10), - offset: pagination.pageIndex * pagination.pageSize, - taskId, - }, - undefined, - { enabled: !isNaN(pagination.pageSize) }, - ); + const filteredKey = searchParams.get(KEY_PATTERN_PARAM); + const filteredDagDisplayName = searchParams.get(DAG_DISPLAY_NAME_PATTERN_PARAM); + const filteredMapIndex = searchParams.get(MAP_INDEX_PARAM); + const filteredRunId = searchParams.get(RUN_ID_PATTERN_PARAM); + const filteredTaskId = searchParams.get(TASK_ID_PATTERN_PARAM); + + const { LOGICAL_DATE_GTE, LOGICAL_DATE_LTE, RUN_AFTER_GTE, RUN_AFTER_LTE } = SearchParamsKeys; + + const logicalDateGte = searchParams.get(LOGICAL_DATE_GTE); + const logicalDateLte = searchParams.get(LOGICAL_DATE_LTE); + const runAfterGte = searchParams.get(RUN_AFTER_GTE); + const runAfterLte = searchParams.get(RUN_AFTER_LTE); + + const apiParams = { + dagDisplayNamePattern: filteredDagDisplayName ?? undefined, + dagId, + dagRunId: runId, + limit: pagination.pageSize, + logicalDateGte: logicalDateGte ?? undefined, + logicalDateLte: logicalDateLte ?? undefined, + mapIndex: + filteredMapIndex !== null && filteredMapIndex !== "" + ? parseInt(filteredMapIndex, 10) + : mapIndex === "-1" + ? undefined + : parseInt(mapIndex, 10), + offset: pagination.pageIndex * pagination.pageSize, + runAfterGte: runAfterGte ?? undefined, + runAfterLte: runAfterLte ?? undefined, + runIdPattern: filteredRunId ?? undefined, + taskId, + taskIdPattern: filteredTaskId ?? undefined, + xcomKeyPattern: filteredKey ?? undefined, + }; + + const { data, error, isFetching, isLoading } = useXcomServiceGetXcomEntries(apiParams, undefined, { + enabled: !isNaN(pagination.pageSize), + }); return ( {dagId === "~" && runId === "~" && taskId === "~" ? ( {translate("xcom.title")} ) : undefined} + + + ; + +export const XComFilters = () => { + const [searchParams, setSearchParams] = useSearchParams(); + const { dagId = "~", mapIndex = "-1", runId = "~", taskId = "~" } = useParams(); + const { setTableURLState, tableURLState } = useTableURLState(); + const { pagination, sorting } = tableURLState; + const { t: translate } = useTranslation(["browse", "common"]); + const [resetKey, setResetKey] = useState(0); + + const visibleFilters = useMemo( + () => + FILTERS.filter((filter) => { + switch (filter.key) { + case SearchParamsKeys.DAG_DISPLAY_NAME_PATTERN: + return dagId === "~"; + case SearchParamsKeys.KEY_PATTERN: + case SearchParamsKeys.LOGICAL_DATE_GTE: + case SearchParamsKeys.LOGICAL_DATE_LTE: + case SearchParamsKeys.RUN_AFTER_GTE: + case SearchParamsKeys.RUN_AFTER_LTE: + return true; + case SearchParamsKeys.MAP_INDEX: + return mapIndex === "-1"; + case SearchParamsKeys.RUN_ID_PATTERN: + return runId === "~"; + case SearchParamsKeys.TASK_ID_PATTERN: + return taskId === "~"; + default: + return true; + } + }), + [dagId, mapIndex, runId, taskId], + ); + + const handleFilterChange = useCallback( + (paramKey: string) => (value: string) => { + if (value === "") { + searchParams.delete(paramKey); + } else { + searchParams.set(paramKey, value); + } + setTableURLState({ + pagination: { ...pagination, pageIndex: 0 }, + sorting, + }); + setSearchParams(searchParams); + }, + [pagination, searchParams, setSearchParams, setTableURLState, sorting], + ); + + const filterCount = useMemo( + () => + visibleFilters.filter((filter) => { + const value = searchParams.get(filter.key); + + return value !== null && value !== ""; + }).length, + [searchParams, visibleFilters], + ); + + const handleResetFilters = useCallback(() => { + visibleFilters.forEach((filter) => { + searchParams.delete(filter.key); + }); + setTableURLState({ + pagination: { ...pagination, pageIndex: 0 }, + sorting, + }); + setSearchParams(searchParams); + setResetKey((prev) => prev + 1); + }, [pagination, searchParams, setSearchParams, setTableURLState, sorting, visibleFilters]); + + const renderFilterInput = (filter: (typeof FILTERS)[number]) => { + const { key, translationKey, type } = filter; + + return ( + + + {type !== "search" && {translate(`common:filters.${translationKey}`)}} + + {type === "search" ? ( + (() => { + const { hotkeyDisabled } = filter; + + return ( + + ); + })() + ) : type === "datetime" ? ( + handleFilterChange(key)(event.target.value)} + value={searchParams.get(key) ?? ""} + /> + ) : ( + handleFilterChange(key)(details.value)} + value={searchParams.get(key) ?? ""} + > + + + )} + + ); + }; + + return ( + + + {visibleFilters.map(renderFilterInput)} + + +   + + {filterCount > 0 && ( + + )} + + + + ); +}; diff --git a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_xcom.py b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_xcom.py index 361b3c6e97a14..db001fe2d117a 100644 --- a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_xcom.py +++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_xcom.py @@ -369,7 +369,7 @@ def test_should_respond_200_with_xcom_key(self, key, expected_entries, test_clie self._create_xcom_entries(TEST_DAG_ID, run_id, logical_date_parsed, TEST_TASK_ID, mapped_ti=True) response = test_client.get( "/dags/~/dagRuns/~/taskInstances/~/xcomEntries", - params={"xcom_key": key} if key is not None else None, + params={"xcom_key_pattern": key} if key is not None else None, ) assert response.status_code == 200