Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
0dbf93f
feat(ui): Add filters for Key, DAG, Task, and Run ID to XCom page
RoyLee1224 Aug 1, 2025
48c3255
feat: implement basic search with filter pill
RoyLee1224 Aug 3, 2025
b45e340
chore(ui): modify filterpill appearance
RoyLee1224 Aug 3, 2025
92dec9f
style(ui): Polish appearance of filter components
RoyLee1224 Aug 3, 2025
806126f
feat(api, ui): Add wildcard search to XComs and simplify filter UI
RoyLee1224 Aug 5, 2025
93a4064
feat(api): Add dag, run, task wildcard search to XComs
RoyLee1224 Aug 5, 2025
37472f9
refactor(ui): Simplify XCom filters
RoyLee1224 Aug 5, 2025
e119a12
feat(api, ui): Add Logical Date Range, Run After Range filter to Xcoms
RoyLee1224 Aug 6, 2025
e483522
Merge branch 'main' into filters
RoyLee1224 Aug 6, 2025
f583c70
fix(i18n): Modify translations
RoyLee1224 Aug 6, 2025
5bec53f
fix: Update test to use xcom_key_pattern parameter
RoyLee1224 Aug 7, 2025
f3424db
refactor(XCom): Unifying handlers and config
RoyLee1224 Aug 8, 2025
bd0d59a
fix(XCom): Hide filters in task instance Xcom tab
RoyLee1224 Aug 8, 2025
2011c05
feat(UI): Add reset button to XCom filters
RoyLee1224 Aug 8, 2025
2494849
fix(ui, api): filter by dag display name & regroup placeholder transl…
RoyLee1224 Aug 8, 2025
784b1e1
fix(ui): modify translations
RoyLee1224 Aug 8, 2025
c42de31
feat(ui,api): Add mapIndex filter
RoyLee1224 Aug 8, 2025
3c25d14
fix(api,ui): Change map_index filter to exact match
RoyLee1224 Aug 8, 2025
306da82
fix(api,ui): Change map_index filter to exact match
RoyLee1224 Aug 8, 2025
19534e8
fix(api): keep xcom_key and map_index
RoyLee1224 Aug 11, 2025
fa320ed
fix(ui): improve XCom filter UX with NumberInput
RoyLee1224 Aug 11, 2025
d5a4ab6
fix(ui): showing only relevant filters in TI Xcom tab
RoyLee1224 Aug 11, 2025
c68729e
fix(ui): replace unicode placeholder with CSS layout
RoyLee1224 Aug 11, 2025
ca2f0a8
Merge branch 'main' into filters
RoyLee1224 Aug 11, 2025
115f386
Merge branch 'main' into filters
RoyLee1224 Aug 12, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions airflow-core/src/airflow/api_fastapi/common/parameters.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
from airflow.models.pool import Pool
from airflow.models.taskinstance import TaskInstance
from airflow.models.variable import Variable
from airflow.models.xcom import XComModel
from airflow.typing_compat import Self
from airflow.utils.state import DagRunState, TaskInstanceState
from airflow.utils.types import DagRunType
Expand Down Expand Up @@ -733,6 +734,21 @@ def _transform_ti_states(states: list[str] | None) -> list[TaskInstanceState | N
),
]

# XCom
QueryXComKeyPatternSearch = Annotated[
_SearchParam, Depends(search_param_factory(XComModel.key, "xcom_key_pattern"))
]

QueryXComDagDisplayNamePatternSearch = Annotated[
_SearchParam, Depends(search_param_factory(DagModel.dag_display_name, "dag_display_name_pattern"))
]
QueryXComRunIdPatternSearch = Annotated[
_SearchParam, Depends(search_param_factory(XComModel.run_id, "run_id_pattern"))
]
QueryXComTaskIdPatternSearch = Annotated[
_SearchParam, Depends(search_param_factory(XComModel.task_id, "task_id_pattern"))
]

# Assets
QueryAssetNamePatternSearch = Annotated[
_SearchParam, Depends(search_param_factory(AssetModel.name, "name_pattern"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4667,6 +4667,98 @@ paths:
minimum: 0
default: 0
title: Offset
- name: xcom_key_pattern
in: query
required: false
schema:
anyOf:
- type: string
- type: 'null'
description: "SQL LIKE expression \u2014 use `%` / `_` wildcards (e.g. `%customer_%`).\
\ Regular expressions are **not** supported."
title: Xcom Key Pattern
description: "SQL LIKE expression \u2014 use `%` / `_` wildcards (e.g. `%customer_%`).\
\ Regular expressions are **not** supported."
- name: dag_display_name_pattern
in: query
required: false
schema:
anyOf:
- type: string
- type: 'null'
description: "SQL LIKE expression \u2014 use `%` / `_` wildcards (e.g. `%customer_%`).\
\ Regular expressions are **not** supported."
title: Dag Display Name Pattern
description: "SQL LIKE expression \u2014 use `%` / `_` wildcards (e.g. `%customer_%`).\
\ Regular expressions are **not** supported."
- name: run_id_pattern
in: query
required: false
schema:
anyOf:
- type: string
- type: 'null'
description: "SQL LIKE expression \u2014 use `%` / `_` wildcards (e.g. `%customer_%`).\
\ Regular expressions are **not** supported."
title: Run Id Pattern
description: "SQL LIKE expression \u2014 use `%` / `_` wildcards (e.g. `%customer_%`).\
\ Regular expressions are **not** supported."
- name: task_id_pattern
in: query
required: false
schema:
anyOf:
- type: string
- type: 'null'
description: "SQL LIKE expression \u2014 use `%` / `_` wildcards (e.g. `%customer_%`).\
\ Regular expressions are **not** supported."
title: Task Id Pattern
description: "SQL LIKE expression \u2014 use `%` / `_` wildcards (e.g. `%customer_%`).\
\ Regular expressions are **not** supported."
- name: map_index_filter
in: query
required: false
schema:
anyOf:
- type: integer
- type: 'null'
title: Map Index Filter
- name: logical_date_gte
in: query
required: false
schema:
anyOf:
- type: string
format: date-time
- type: 'null'
title: Logical Date Gte
- name: logical_date_lte
in: query
required: false
schema:
anyOf:
- type: string
format: date-time
- type: 'null'
title: Logical Date Lte
- name: run_after_gte
in: query
required: false
schema:
anyOf:
- type: string
format: date-time
- type: 'null'
title: Run After Gte
- name: run_after_lte
in: query
required: false
schema:
anyOf:
- type: string
format: date-time
- type: 'null'
title: Run After Lte
responses:
'200':
description: Successful Response
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,18 @@
from airflow.api_fastapi.auth.managers.models.resource_details import DagAccessEntity
from airflow.api_fastapi.common.dagbag import DagBagDep, get_dag_for_run_or_latest_version
from airflow.api_fastapi.common.db.common import SessionDep, paginated_select
from airflow.api_fastapi.common.parameters import QueryLimit, QueryOffset
from airflow.api_fastapi.common.parameters import (
FilterParam,
QueryLimit,
QueryOffset,
QueryXComDagDisplayNamePatternSearch,
QueryXComKeyPatternSearch,
QueryXComRunIdPatternSearch,
QueryXComTaskIdPatternSearch,
RangeFilter,
datetime_range_filter_factory,
filter_param_factory,
)
from airflow.api_fastapi.common.router import AirflowRouter
from airflow.api_fastapi.core_api.datamodels.xcom import (
XComCollectionResponse,
Expand All @@ -40,6 +51,7 @@
from airflow.api_fastapi.logging.decorators import action_logging
from airflow.exceptions import TaskNotFound
from airflow.models import DagRun as DR
from airflow.models.dag import DagModel
from airflow.models.xcom import XComModel

xcom_router = AirflowRouter(
Expand Down Expand Up @@ -127,6 +139,16 @@ def get_xcom_entries(
offset: QueryOffset,
readable_xcom_filter: ReadableXComFilterDep,
session: SessionDep,
xcom_key_pattern: QueryXComKeyPatternSearch,
dag_display_name_pattern: QueryXComDagDisplayNamePatternSearch,
run_id_pattern: QueryXComRunIdPatternSearch,
task_id_pattern: QueryXComTaskIdPatternSearch,
map_index_filter: Annotated[
FilterParam[int | None],
Depends(filter_param_factory(XComModel.map_index, int | None, filter_name="map_index_filter")),
],
logical_date_range: Annotated[RangeFilter, Depends(datetime_range_filter_factory("logical_date", DR))],
run_after_range: Annotated[RangeFilter, Depends(datetime_range_filter_factory("run_after", DR))],
xcom_key: Annotated[str | None, Query()] = None,
map_index: Annotated[int | None, Query(ge=-1)] = None,
) -> XComCollectionResponse:
Expand All @@ -138,8 +160,10 @@ def get_xcom_entries(
query = select(XComModel)
if dag_id != "~":
query = query.where(XComModel.dag_id == dag_id)
query = query.join(DR, and_(XComModel.dag_id == DR.dag_id, XComModel.run_id == DR.run_id)).options(
joinedload(XComModel.dag_run).joinedload(DR.dag_model)
query = (
query.join(DR, and_(XComModel.dag_id == DR.dag_id, XComModel.run_id == DR.run_id))
.join(DagModel, DR.dag_id == DagModel.dag_id)
.options(joinedload(XComModel.dag_run).joinedload(DR.dag_model))
)

if task_id != "~":
Expand All @@ -153,7 +177,16 @@ def get_xcom_entries(

query, total_entries = paginated_select(
statement=query,
filters=[readable_xcom_filter],
filters=[
readable_xcom_filter,
xcom_key_pattern,
dag_display_name_pattern,
run_id_pattern,
task_id_pattern,
map_index_filter,
logical_date_range,
run_after_range,
],
offset=offset,
limit=limit,
session=session,
Expand Down
13 changes: 11 additions & 2 deletions airflow-core/src/airflow/ui/openapi-gen/queries/common.ts
Original file line number Diff line number Diff line change
Expand Up @@ -574,15 +574,24 @@ export const UseXcomServiceGetXcomEntryKeyFn = ({ dagId, dagRunId, deserialize,
export type XcomServiceGetXcomEntriesDefaultResponse = Awaited<ReturnType<typeof XcomService.getXcomEntries>>;
export type XcomServiceGetXcomEntriesQueryResult<TData = XcomServiceGetXcomEntriesDefaultResponse, TError = unknown> = UseQueryResult<TData, TError>;
export const useXcomServiceGetXcomEntriesKey = "XcomServiceGetXcomEntries";
export const UseXcomServiceGetXcomEntriesKeyFn = ({ dagId, dagRunId, limit, mapIndex, offset, taskId, xcomKey }: {
export const UseXcomServiceGetXcomEntriesKeyFn = ({ dagDisplayNamePattern, dagId, dagRunId, limit, logicalDateGte, logicalDateLte, mapIndex, mapIndexFilter, offset, runAfterGte, runAfterLte, runIdPattern, taskId, taskIdPattern, xcomKey, xcomKeyPattern }: {
dagDisplayNamePattern?: string;
dagId: string;
dagRunId: string;
limit?: number;
logicalDateGte?: string;
logicalDateLte?: string;
mapIndex?: number;
mapIndexFilter?: number;
offset?: number;
runAfterGte?: string;
runAfterLte?: string;
runIdPattern?: string;
taskId: string;
taskIdPattern?: string;
xcomKey?: string;
}, queryKey?: Array<unknown>) => [useXcomServiceGetXcomEntriesKey, ...(queryKey ?? [{ dagId, dagRunId, limit, mapIndex, offset, taskId, xcomKey }])];
xcomKeyPattern?: string;
}, queryKey?: Array<unknown>) => [useXcomServiceGetXcomEntriesKey, ...(queryKey ?? [{ dagDisplayNamePattern, dagId, dagRunId, limit, logicalDateGte, logicalDateLte, mapIndex, mapIndexFilter, offset, runAfterGte, runAfterLte, runIdPattern, taskId, taskIdPattern, xcomKey, xcomKeyPattern }])];
export type TaskServiceGetTasksDefaultResponse = Awaited<ReturnType<typeof TaskService.getTasks>>;
export type TaskServiceGetTasksQueryResult<TData = TaskServiceGetTasksDefaultResponse, TError = unknown> = UseQueryResult<TData, TError>;
export const useTaskServiceGetTasksKey = "TaskServiceGetTasks";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1088,18 +1088,36 @@ export const ensureUseXcomServiceGetXcomEntryData = (queryClient: QueryClient, {
* @param data.mapIndex
* @param data.limit
* @param data.offset
* @param data.xcomKeyPattern SQL LIKE expression — use `%` / `_` wildcards (e.g. `%customer_%`). Regular expressions are **not** supported.
* @param data.dagDisplayNamePattern SQL LIKE expression — use `%` / `_` wildcards (e.g. `%customer_%`). Regular expressions are **not** supported.
* @param data.runIdPattern SQL LIKE expression — use `%` / `_` wildcards (e.g. `%customer_%`). Regular expressions are **not** supported.
* @param data.taskIdPattern SQL LIKE expression — use `%` / `_` wildcards (e.g. `%customer_%`). Regular expressions are **not** supported.
* @param data.mapIndexFilter
* @param data.logicalDateGte
* @param data.logicalDateLte
* @param data.runAfterGte
* @param data.runAfterLte
* @returns XComCollectionResponse Successful Response
* @throws ApiError
*/
export const ensureUseXcomServiceGetXcomEntriesData = (queryClient: QueryClient, { dagId, dagRunId, limit, mapIndex, offset, taskId, xcomKey }: {
export const ensureUseXcomServiceGetXcomEntriesData = (queryClient: QueryClient, { dagDisplayNamePattern, dagId, dagRunId, limit, logicalDateGte, logicalDateLte, mapIndex, mapIndexFilter, offset, runAfterGte, runAfterLte, runIdPattern, taskId, taskIdPattern, xcomKey, xcomKeyPattern }: {
dagDisplayNamePattern?: string;
dagId: string;
dagRunId: string;
limit?: number;
logicalDateGte?: string;
logicalDateLte?: string;
mapIndex?: number;
mapIndexFilter?: number;
offset?: number;
runAfterGte?: string;
runAfterLte?: string;
runIdPattern?: string;
taskId: string;
taskIdPattern?: string;
xcomKey?: string;
}) => queryClient.ensureQueryData({ queryKey: Common.UseXcomServiceGetXcomEntriesKeyFn({ dagId, dagRunId, limit, mapIndex, offset, taskId, xcomKey }), queryFn: () => XcomService.getXcomEntries({ dagId, dagRunId, limit, mapIndex, offset, taskId, xcomKey }) });
xcomKeyPattern?: string;
}) => queryClient.ensureQueryData({ queryKey: Common.UseXcomServiceGetXcomEntriesKeyFn({ dagDisplayNamePattern, dagId, dagRunId, limit, logicalDateGte, logicalDateLte, mapIndex, mapIndexFilter, offset, runAfterGte, runAfterLte, runIdPattern, taskId, taskIdPattern, xcomKey, xcomKeyPattern }), queryFn: () => XcomService.getXcomEntries({ dagDisplayNamePattern, dagId, dagRunId, limit, logicalDateGte, logicalDateLte, mapIndex, mapIndexFilter, offset, runAfterGte, runAfterLte, runIdPattern, taskId, taskIdPattern, xcomKey, xcomKeyPattern }) });
/**
* Get Tasks
* Get tasks for DAG.
Expand Down
22 changes: 20 additions & 2 deletions airflow-core/src/airflow/ui/openapi-gen/queries/prefetch.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1088,18 +1088,36 @@ export const prefetchUseXcomServiceGetXcomEntry = (queryClient: QueryClient, { d
* @param data.mapIndex
* @param data.limit
* @param data.offset
* @param data.xcomKeyPattern SQL LIKE expression — use `%` / `_` wildcards (e.g. `%customer_%`). Regular expressions are **not** supported.
* @param data.dagDisplayNamePattern SQL LIKE expression — use `%` / `_` wildcards (e.g. `%customer_%`). Regular expressions are **not** supported.
* @param data.runIdPattern SQL LIKE expression — use `%` / `_` wildcards (e.g. `%customer_%`). Regular expressions are **not** supported.
* @param data.taskIdPattern SQL LIKE expression — use `%` / `_` wildcards (e.g. `%customer_%`). Regular expressions are **not** supported.
* @param data.mapIndexFilter
* @param data.logicalDateGte
* @param data.logicalDateLte
* @param data.runAfterGte
* @param data.runAfterLte
* @returns XComCollectionResponse Successful Response
* @throws ApiError
*/
export const prefetchUseXcomServiceGetXcomEntries = (queryClient: QueryClient, { dagId, dagRunId, limit, mapIndex, offset, taskId, xcomKey }: {
export const prefetchUseXcomServiceGetXcomEntries = (queryClient: QueryClient, { dagDisplayNamePattern, dagId, dagRunId, limit, logicalDateGte, logicalDateLte, mapIndex, mapIndexFilter, offset, runAfterGte, runAfterLte, runIdPattern, taskId, taskIdPattern, xcomKey, xcomKeyPattern }: {
dagDisplayNamePattern?: string;
dagId: string;
dagRunId: string;
limit?: number;
logicalDateGte?: string;
logicalDateLte?: string;
mapIndex?: number;
mapIndexFilter?: number;
offset?: number;
runAfterGte?: string;
runAfterLte?: string;
runIdPattern?: string;
taskId: string;
taskIdPattern?: string;
xcomKey?: string;
}) => queryClient.prefetchQuery({ queryKey: Common.UseXcomServiceGetXcomEntriesKeyFn({ dagId, dagRunId, limit, mapIndex, offset, taskId, xcomKey }), queryFn: () => XcomService.getXcomEntries({ dagId, dagRunId, limit, mapIndex, offset, taskId, xcomKey }) });
xcomKeyPattern?: string;
}) => queryClient.prefetchQuery({ queryKey: Common.UseXcomServiceGetXcomEntriesKeyFn({ dagDisplayNamePattern, dagId, dagRunId, limit, logicalDateGte, logicalDateLte, mapIndex, mapIndexFilter, offset, runAfterGte, runAfterLte, runIdPattern, taskId, taskIdPattern, xcomKey, xcomKeyPattern }), queryFn: () => XcomService.getXcomEntries({ dagDisplayNamePattern, dagId, dagRunId, limit, logicalDateGte, logicalDateLte, mapIndex, mapIndexFilter, offset, runAfterGte, runAfterLte, runIdPattern, taskId, taskIdPattern, xcomKey, xcomKeyPattern }) });
/**
* Get Tasks
* Get tasks for DAG.
Expand Down
22 changes: 20 additions & 2 deletions airflow-core/src/airflow/ui/openapi-gen/queries/queries.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1088,18 +1088,36 @@ export const useXcomServiceGetXcomEntry = <TData = Common.XcomServiceGetXcomEntr
* @param data.mapIndex
* @param data.limit
* @param data.offset
* @param data.xcomKeyPattern SQL LIKE expression — use `%` / `_` wildcards (e.g. `%customer_%`). Regular expressions are **not** supported.
* @param data.dagDisplayNamePattern SQL LIKE expression — use `%` / `_` wildcards (e.g. `%customer_%`). Regular expressions are **not** supported.
* @param data.runIdPattern SQL LIKE expression — use `%` / `_` wildcards (e.g. `%customer_%`). Regular expressions are **not** supported.
* @param data.taskIdPattern SQL LIKE expression — use `%` / `_` wildcards (e.g. `%customer_%`). Regular expressions are **not** supported.
* @param data.mapIndexFilter
* @param data.logicalDateGte
* @param data.logicalDateLte
* @param data.runAfterGte
* @param data.runAfterLte
* @returns XComCollectionResponse Successful Response
* @throws ApiError
*/
export const useXcomServiceGetXcomEntries = <TData = Common.XcomServiceGetXcomEntriesDefaultResponse, TError = unknown, TQueryKey extends Array<unknown> = unknown[]>({ dagId, dagRunId, limit, mapIndex, offset, taskId, xcomKey }: {
export const useXcomServiceGetXcomEntries = <TData = Common.XcomServiceGetXcomEntriesDefaultResponse, TError = unknown, TQueryKey extends Array<unknown> = unknown[]>({ dagDisplayNamePattern, dagId, dagRunId, limit, logicalDateGte, logicalDateLte, mapIndex, mapIndexFilter, offset, runAfterGte, runAfterLte, runIdPattern, taskId, taskIdPattern, xcomKey, xcomKeyPattern }: {
dagDisplayNamePattern?: string;
dagId: string;
dagRunId: string;
limit?: number;
logicalDateGte?: string;
logicalDateLte?: string;
mapIndex?: number;
mapIndexFilter?: number;
offset?: number;
runAfterGte?: string;
runAfterLte?: string;
runIdPattern?: string;
taskId: string;
taskIdPattern?: string;
xcomKey?: string;
}, queryKey?: TQueryKey, options?: Omit<UseQueryOptions<TData, TError>, "queryKey" | "queryFn">) => useQuery<TData, TError>({ queryKey: Common.UseXcomServiceGetXcomEntriesKeyFn({ dagId, dagRunId, limit, mapIndex, offset, taskId, xcomKey }, queryKey), queryFn: () => XcomService.getXcomEntries({ dagId, dagRunId, limit, mapIndex, offset, taskId, xcomKey }) as TData, ...options });
xcomKeyPattern?: string;
}, queryKey?: TQueryKey, options?: Omit<UseQueryOptions<TData, TError>, "queryKey" | "queryFn">) => useQuery<TData, TError>({ queryKey: Common.UseXcomServiceGetXcomEntriesKeyFn({ dagDisplayNamePattern, dagId, dagRunId, limit, logicalDateGte, logicalDateLte, mapIndex, mapIndexFilter, offset, runAfterGte, runAfterLte, runIdPattern, taskId, taskIdPattern, xcomKey, xcomKeyPattern }, queryKey), queryFn: () => XcomService.getXcomEntries({ dagDisplayNamePattern, dagId, dagRunId, limit, logicalDateGte, logicalDateLte, mapIndex, mapIndexFilter, offset, runAfterGte, runAfterLte, runIdPattern, taskId, taskIdPattern, xcomKey, xcomKeyPattern }) as TData, ...options });
/**
* Get Tasks
* Get tasks for DAG.
Expand Down
Loading