Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions airflow-core/src/airflow/api_fastapi/common/db/dags.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,12 @@ def generate_dag_with_latest_run_query(max_run_filters: list[BaseParam], order_b
has_max_run_filter = True
break

if has_max_run_filter or order_by.value in (
"last_run_state",
"last_run_start_date",
"-last_run_state",
"-last_run_start_date",
):
requested_order_by_set = set(order_by.value) if order_by.value is not None else set()
dag_run_order_by_set = set(
["last_run_state", "last_run_start_date", "-last_run_state", "-last_run_start_date"],
)

if has_max_run_filter or (requested_order_by_set & dag_run_order_by_set):
query = query.join(
max_run_id_query,
DagModel.dag_id == max_run_id_query.c.dag_id,
Expand Down
72 changes: 48 additions & 24 deletions airflow-core/src/airflow/api_fastapi/common/parameters.py
Original file line number Diff line number Diff line change
Expand Up @@ -198,9 +198,11 @@ def depends_search(
return depends_search


class SortParam(BaseParam[str]):
class SortParam(BaseParam[list[str]]):
"""Order result by the attribute."""

MAX_SORT_PARAMS = 10

def __init__(
self, allowed_attrs: list[str], model: Base, to_replace: dict[str, str | Column] | None = None
) -> None:
Expand All @@ -214,38 +216,56 @@ def to_orm(self, select: Select) -> Select:
raise ValueError(f"Cannot set 'skip_none' to False on a {type(self)}")

if self.value is None:
self.value = self.get_primary_key_string()

lstriped_orderby = self.value.lstrip("-")
column: Column | None = None
if self.to_replace:
replacement = self.to_replace.get(lstriped_orderby, lstriped_orderby)
if isinstance(replacement, str):
lstriped_orderby = replacement
else:
column = replacement
self.value = [self.get_primary_key_string()]

if (self.allowed_attrs and lstriped_orderby not in self.allowed_attrs) and column is None:
order_by_values = self.value
if len(order_by_values) > self.MAX_SORT_PARAMS:
raise HTTPException(
400,
f"Ordering with '{lstriped_orderby}' is disallowed or "
f"the attribute does not exist on the model",
f"Ordering with more than {self.MAX_SORT_PARAMS} parameters is not allowed. Provided: {order_by_values}",
)
if column is None:
column = getattr(self.model, lstriped_orderby)

# MySQL does not support `nullslast`, and True/False ordering depends on the
# database implementation.
nullscheck = case((column.isnot(None), 0), else_=1)
columns: list[Column] = []
for order_by_value in order_by_values:
lstriped_orderby = order_by_value.lstrip("-")
column: Column | None = None
if self.to_replace:
replacement = self.to_replace.get(lstriped_orderby, lstriped_orderby)
if isinstance(replacement, str):
lstriped_orderby = replacement
else:
column = replacement

if (self.allowed_attrs and lstriped_orderby not in self.allowed_attrs) and column is None:
raise HTTPException(
400,
f"Ordering with '{lstriped_orderby}' is disallowed or "
f"the attribute does not exist on the model",
)
if column is None:
column = getattr(self.model, lstriped_orderby)

# MySQL does not support `nullslast`, and True/False ordering depends on the
# database implementation.
nullscheck = case((column.isnot(None), 0), else_=1)

columns.append(nullscheck)
if order_by_value.startswith("-"):
columns.append(column.desc())
else:
columns.append(column.asc())

# Reset default sorting
select = select.order_by(None)

primary_key_column = self.get_primary_key_column()
# Always add a final discriminator to enforce deterministic ordering.
if order_by_values and order_by_values[0].startswith("-"):
columns.append(primary_key_column.desc())
else:
columns.append(primary_key_column.asc())

if self.value[0] == "-":
return select.order_by(nullscheck, column.desc(), primary_key_column.desc())
return select.order_by(nullscheck, column.asc(), primary_key_column.asc())
return select.order_by(*columns)

def get_primary_key_column(self) -> Column:
"""Get the primary key column of the model of SortParam object."""
Expand All @@ -260,8 +280,12 @@ def depends(cls, *args: Any, **kwargs: Any) -> Self:
raise NotImplementedError("Use dynamic_depends, depends not implemented.")

def dynamic_depends(self, default: str | None = None) -> Callable:
def inner(order_by: str = default or self.get_primary_key_string()) -> SortParam:
return self.set_value(self.get_primary_key_string() if order_by == "" else order_by)
def inner(
order_by: list[str] = Query(
default=[default] if default is not None else [self.get_primary_key_string()]
),
) -> SortParam:
return self.set_value(order_by)

return inner

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,8 +220,11 @@ paths:
in: query
required: false
schema:
type: string
default: dag_id
type: array
items:
type: string
default:
- dag_id
title: Order By
- name: is_favorite
in: query
Expand Down Expand Up @@ -485,8 +488,11 @@ paths:
in: query
required: false
schema:
type: string
default: id
type: array
items:
type: string
default:
- id
title: Order By
- name: dag_id
in: query
Expand Down Expand Up @@ -560,8 +566,11 @@ paths:
in: query
required: false
schema:
type: string
default: id
type: array
items:
type: string
default:
- id
title: Order By
- name: run_after_gte
in: query
Expand Down Expand Up @@ -646,8 +655,11 @@ paths:
in: query
required: false
schema:
type: string
default: id
type: array
items:
type: string
default:
- id
title: Order By
- name: run_after_gte
in: query
Expand Down
Loading