Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions airflow-core/src/airflow/api_fastapi/common/parameters.py
Original file line number Diff line number Diff line change
Expand Up @@ -761,6 +761,18 @@ def _transform_ti_states(states: list[str] | None) -> list[TaskInstanceState | N
)
),
]
QueryDagRunVersionFilter = Annotated[
FilterParam[list[int]],
Depends(
filter_param_factory(
DagVersion.version_number,
list[int],
FilterOptionEnum.ANY_EQUAL,
default_factory=list,
filter_name="dag_version",
)
),
]
QueryTITryNumberFilter = Annotated[
FilterParam[list[int]],
Depends(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2191,6 +2191,14 @@ paths:
items:
type: string
title: State
- name: dag_version
in: query
required: false
schema:
type: array
items:
type: integer
title: Dag Version
- name: order_by
in: query
required: false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
OffsetFilter,
QueryDagRunRunTypesFilter,
QueryDagRunStateFilter,
QueryDagRunVersionFilter,
QueryLimit,
QueryOffset,
Range,
Expand Down Expand Up @@ -80,6 +81,7 @@
from airflow.exceptions import ParamValidationError
from airflow.listeners.listener import get_listener_manager
from airflow.models import DagModel, DagRun
from airflow.models.dag_version import DagVersion
from airflow.utils.state import DagRunState
from airflow.utils.types import DagRunTriggeredByType, DagRunType

Expand Down Expand Up @@ -318,6 +320,7 @@ def get_dag_runs(
update_at_range: Annotated[RangeFilter, Depends(datetime_range_filter_factory("updated_at", DagRun))],
run_type: QueryDagRunRunTypesFilter,
state: QueryDagRunStateFilter,
dag_version: QueryDagRunVersionFilter,
order_by: Annotated[
SortParam,
Depends(
Expand Down Expand Up @@ -360,6 +363,10 @@ def get_dag_runs(
get_latest_version_of_dag(dag_bag, dag_id, session) # Check if the DAG exists.
query = query.filter(DagRun.dag_id == dag_id).options(joinedload(DagRun.dag_model))

# Add join with DagVersion if dag_version filter is active
if dag_version.value:
query = query.join(DagVersion, DagRun.created_dag_version_id == DagVersion.id)

dag_run_select, total_entries = paginated_select(
statement=query,
filters=[
Expand All @@ -370,6 +377,7 @@ def get_dag_runs(
update_at_range,
state,
run_type,
dag_version,
readable_dag_runs_filter,
run_id_pattern,
triggering_user_name_pattern,
Expand Down
5 changes: 3 additions & 2 deletions airflow-core/src/airflow/ui/openapi-gen/queries/common.ts
Original file line number Diff line number Diff line change
Expand Up @@ -142,8 +142,9 @@ export const UseDagRunServiceGetUpstreamAssetEventsKeyFn = ({ dagId, dagRunId }:
export type DagRunServiceGetDagRunsDefaultResponse = Awaited<ReturnType<typeof DagRunService.getDagRuns>>;
export type DagRunServiceGetDagRunsQueryResult<TData = DagRunServiceGetDagRunsDefaultResponse, TError = unknown> = UseQueryResult<TData, TError>;
export const useDagRunServiceGetDagRunsKey = "DagRunServiceGetDagRuns";
export const UseDagRunServiceGetDagRunsKeyFn = ({ dagId, endDateGt, endDateGte, endDateLt, endDateLte, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, offset, orderBy, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runIdPattern, runType, startDateGt, startDateGte, startDateLt, startDateLte, state, triggeringUserNamePattern, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte }: {
export const UseDagRunServiceGetDagRunsKeyFn = ({ dagId, dagVersion, endDateGt, endDateGte, endDateLt, endDateLte, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, offset, orderBy, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runIdPattern, runType, startDateGt, startDateGte, startDateLt, startDateLte, state, triggeringUserNamePattern, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte }: {
dagId: string;
dagVersion?: number[];
endDateGt?: string;
endDateGte?: string;
endDateLt?: string;
Expand Down Expand Up @@ -171,7 +172,7 @@ export const UseDagRunServiceGetDagRunsKeyFn = ({ dagId, endDateGt, endDateGte,
updatedAtGte?: string;
updatedAtLt?: string;
updatedAtLte?: string;
}, queryKey?: Array<unknown>) => [useDagRunServiceGetDagRunsKey, ...(queryKey ?? [{ dagId, endDateGt, endDateGte, endDateLt, endDateLte, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, offset, orderBy, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runIdPattern, runType, startDateGt, startDateGte, startDateLt, startDateLte, state, triggeringUserNamePattern, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte }])];
}, queryKey?: Array<unknown>) => [useDagRunServiceGetDagRunsKey, ...(queryKey ?? [{ dagId, dagVersion, endDateGt, endDateGte, endDateLt, endDateLte, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, offset, orderBy, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runIdPattern, runType, startDateGt, startDateGte, startDateLt, startDateLte, state, triggeringUserNamePattern, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte }])];
export type DagRunServiceWaitDagRunUntilFinishedDefaultResponse = Awaited<ReturnType<typeof DagRunService.waitDagRunUntilFinished>>;
export type DagRunServiceWaitDagRunUntilFinishedQueryResult<TData = DagRunServiceWaitDagRunUntilFinishedDefaultResponse, TError = unknown> = UseQueryResult<TData, TError>;
export const useDagRunServiceWaitDagRunUntilFinishedKey = "DagRunServiceWaitDagRunUntilFinished";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -285,14 +285,16 @@ export const ensureUseDagRunServiceGetUpstreamAssetEventsData = (queryClient: Qu
* @param data.updatedAtLt
* @param data.runType
* @param data.state
* @param data.dagVersion
* @param data.orderBy
* @param data.runIdPattern SQL LIKE expression — use `%` / `_` wildcards (e.g. `%customer_%`). Regular expressions are **not** supported.
* @param data.triggeringUserNamePattern SQL LIKE expression — use `%` / `_` wildcards (e.g. `%customer_%`). Regular expressions are **not** supported.
* @returns DAGRunCollectionResponse Successful Response
* @throws ApiError
*/
export const ensureUseDagRunServiceGetDagRunsData = (queryClient: QueryClient, { dagId, endDateGt, endDateGte, endDateLt, endDateLte, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, offset, orderBy, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runIdPattern, runType, startDateGt, startDateGte, startDateLt, startDateLte, state, triggeringUserNamePattern, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte }: {
export const ensureUseDagRunServiceGetDagRunsData = (queryClient: QueryClient, { dagId, dagVersion, endDateGt, endDateGte, endDateLt, endDateLte, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, offset, orderBy, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runIdPattern, runType, startDateGt, startDateGte, startDateLt, startDateLte, state, triggeringUserNamePattern, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte }: {
dagId: string;
dagVersion?: number[];
endDateGt?: string;
endDateGte?: string;
endDateLt?: string;
Expand Down Expand Up @@ -320,7 +322,7 @@ export const ensureUseDagRunServiceGetDagRunsData = (queryClient: QueryClient, {
updatedAtGte?: string;
updatedAtLt?: string;
updatedAtLte?: string;
}) => queryClient.ensureQueryData({ queryKey: Common.UseDagRunServiceGetDagRunsKeyFn({ dagId, endDateGt, endDateGte, endDateLt, endDateLte, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, offset, orderBy, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runIdPattern, runType, startDateGt, startDateGte, startDateLt, startDateLte, state, triggeringUserNamePattern, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte }), queryFn: () => DagRunService.getDagRuns({ dagId, endDateGt, endDateGte, endDateLt, endDateLte, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, offset, orderBy, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runIdPattern, runType, startDateGt, startDateGte, startDateLt, startDateLte, state, triggeringUserNamePattern, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte }) });
}) => queryClient.ensureQueryData({ queryKey: Common.UseDagRunServiceGetDagRunsKeyFn({ dagId, dagVersion, endDateGt, endDateGte, endDateLt, endDateLte, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, offset, orderBy, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runIdPattern, runType, startDateGt, startDateGte, startDateLt, startDateLte, state, triggeringUserNamePattern, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte }), queryFn: () => DagRunService.getDagRuns({ dagId, dagVersion, endDateGt, endDateGte, endDateLt, endDateLte, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, offset, orderBy, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runIdPattern, runType, startDateGt, startDateGte, startDateLt, startDateLte, state, triggeringUserNamePattern, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte }) });
/**
* Experimental: Wait for a dag run to complete, and return task results if requested.
* 🚧 This is an experimental endpoint and may change or be removed without notice.
Expand Down
6 changes: 4 additions & 2 deletions airflow-core/src/airflow/ui/openapi-gen/queries/prefetch.ts
Original file line number Diff line number Diff line change
Expand Up @@ -285,14 +285,16 @@ export const prefetchUseDagRunServiceGetUpstreamAssetEvents = (queryClient: Quer
* @param data.updatedAtLt
* @param data.runType
* @param data.state
* @param data.dagVersion
* @param data.orderBy
* @param data.runIdPattern SQL LIKE expression — use `%` / `_` wildcards (e.g. `%customer_%`). Regular expressions are **not** supported.
* @param data.triggeringUserNamePattern SQL LIKE expression — use `%` / `_` wildcards (e.g. `%customer_%`). Regular expressions are **not** supported.
* @returns DAGRunCollectionResponse Successful Response
* @throws ApiError
*/
export const prefetchUseDagRunServiceGetDagRuns = (queryClient: QueryClient, { dagId, endDateGt, endDateGte, endDateLt, endDateLte, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, offset, orderBy, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runIdPattern, runType, startDateGt, startDateGte, startDateLt, startDateLte, state, triggeringUserNamePattern, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte }: {
export const prefetchUseDagRunServiceGetDagRuns = (queryClient: QueryClient, { dagId, dagVersion, endDateGt, endDateGte, endDateLt, endDateLte, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, offset, orderBy, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runIdPattern, runType, startDateGt, startDateGte, startDateLt, startDateLte, state, triggeringUserNamePattern, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte }: {
dagId: string;
dagVersion?: number[];
endDateGt?: string;
endDateGte?: string;
endDateLt?: string;
Expand Down Expand Up @@ -320,7 +322,7 @@ export const prefetchUseDagRunServiceGetDagRuns = (queryClient: QueryClient, { d
updatedAtGte?: string;
updatedAtLt?: string;
updatedAtLte?: string;
}) => queryClient.prefetchQuery({ queryKey: Common.UseDagRunServiceGetDagRunsKeyFn({ dagId, endDateGt, endDateGte, endDateLt, endDateLte, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, offset, orderBy, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runIdPattern, runType, startDateGt, startDateGte, startDateLt, startDateLte, state, triggeringUserNamePattern, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte }), queryFn: () => DagRunService.getDagRuns({ dagId, endDateGt, endDateGte, endDateLt, endDateLte, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, offset, orderBy, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runIdPattern, runType, startDateGt, startDateGte, startDateLt, startDateLte, state, triggeringUserNamePattern, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte }) });
}) => queryClient.prefetchQuery({ queryKey: Common.UseDagRunServiceGetDagRunsKeyFn({ dagId, dagVersion, endDateGt, endDateGte, endDateLt, endDateLte, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, offset, orderBy, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runIdPattern, runType, startDateGt, startDateGte, startDateLt, startDateLte, state, triggeringUserNamePattern, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte }), queryFn: () => DagRunService.getDagRuns({ dagId, dagVersion, endDateGt, endDateGte, endDateLt, endDateLte, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, offset, orderBy, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runIdPattern, runType, startDateGt, startDateGte, startDateLt, startDateLte, state, triggeringUserNamePattern, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte }) });
/**
* Experimental: Wait for a dag run to complete, and return task results if requested.
* 🚧 This is an experimental endpoint and may change or be removed without notice.
Expand Down
6 changes: 4 additions & 2 deletions airflow-core/src/airflow/ui/openapi-gen/queries/queries.ts
Original file line number Diff line number Diff line change
Expand Up @@ -285,14 +285,16 @@ export const useDagRunServiceGetUpstreamAssetEvents = <TData = Common.DagRunServ
* @param data.updatedAtLt
* @param data.runType
* @param data.state
* @param data.dagVersion
* @param data.orderBy
* @param data.runIdPattern SQL LIKE expression — use `%` / `_` wildcards (e.g. `%customer_%`). Regular expressions are **not** supported.
* @param data.triggeringUserNamePattern SQL LIKE expression — use `%` / `_` wildcards (e.g. `%customer_%`). Regular expressions are **not** supported.
* @returns DAGRunCollectionResponse Successful Response
* @throws ApiError
*/
export const useDagRunServiceGetDagRuns = <TData = Common.DagRunServiceGetDagRunsDefaultResponse, TError = unknown, TQueryKey extends Array<unknown> = unknown[]>({ dagId, endDateGt, endDateGte, endDateLt, endDateLte, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, offset, orderBy, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runIdPattern, runType, startDateGt, startDateGte, startDateLt, startDateLte, state, triggeringUserNamePattern, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte }: {
export const useDagRunServiceGetDagRuns = <TData = Common.DagRunServiceGetDagRunsDefaultResponse, TError = unknown, TQueryKey extends Array<unknown> = unknown[]>({ dagId, dagVersion, endDateGt, endDateGte, endDateLt, endDateLte, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, offset, orderBy, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runIdPattern, runType, startDateGt, startDateGte, startDateLt, startDateLte, state, triggeringUserNamePattern, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte }: {
dagId: string;
dagVersion?: number[];
endDateGt?: string;
endDateGte?: string;
endDateLt?: string;
Expand Down Expand Up @@ -320,7 +322,7 @@ export const useDagRunServiceGetDagRuns = <TData = Common.DagRunServiceGetDagRun
updatedAtGte?: string;
updatedAtLt?: string;
updatedAtLte?: string;
}, queryKey?: TQueryKey, options?: Omit<UseQueryOptions<TData, TError>, "queryKey" | "queryFn">) => useQuery<TData, TError>({ queryKey: Common.UseDagRunServiceGetDagRunsKeyFn({ dagId, endDateGt, endDateGte, endDateLt, endDateLte, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, offset, orderBy, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runIdPattern, runType, startDateGt, startDateGte, startDateLt, startDateLte, state, triggeringUserNamePattern, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte }, queryKey), queryFn: () => DagRunService.getDagRuns({ dagId, endDateGt, endDateGte, endDateLt, endDateLte, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, offset, orderBy, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runIdPattern, runType, startDateGt, startDateGte, startDateLt, startDateLte, state, triggeringUserNamePattern, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte }) as TData, ...options });
}, queryKey?: TQueryKey, options?: Omit<UseQueryOptions<TData, TError>, "queryKey" | "queryFn">) => useQuery<TData, TError>({ queryKey: Common.UseDagRunServiceGetDagRunsKeyFn({ dagId, dagVersion, endDateGt, endDateGte, endDateLt, endDateLte, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, offset, orderBy, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runIdPattern, runType, startDateGt, startDateGte, startDateLt, startDateLte, state, triggeringUserNamePattern, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte }, queryKey), queryFn: () => DagRunService.getDagRuns({ dagId, dagVersion, endDateGt, endDateGte, endDateLt, endDateLte, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, offset, orderBy, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runIdPattern, runType, startDateGt, startDateGte, startDateLt, startDateLte, state, triggeringUserNamePattern, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte }) as TData, ...options });
/**
* Experimental: Wait for a dag run to complete, and return task results if requested.
* 🚧 This is an experimental endpoint and may change or be removed without notice.
Expand Down
Loading