Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
652ffec
Fix breeze api-server - missed it during config move
jedcunningham Feb 26, 2025
8a7d13b
Merge branch 'main' of https://github.com/apache/airflow
rawwar Feb 26, 2025
3287db5
Merge branch 'main' of https://github.com/apache/airflow
rawwar Feb 26, 2025
5e164be
Merge branch 'main' of https://github.com/apache/airflow
rawwar Feb 27, 2025
44e9883
init asset
rawwar Feb 27, 2025
9ab9066
update rest
rawwar Feb 27, 2025
7b9898b
Merge branch 'main' of https://github.com/apache/airflow into kalyan/…
rawwar Feb 27, 2025
4976819
add commit session after creating dags
rawwar Feb 27, 2025
1b69bc0
Merge branch 'main' of https://github.com/apache/airflow into kalyan/…
rawwar Feb 27, 2025
e3b411b
fix issue with time_machine
rawwar Feb 27, 2025
113f907
Merge branch 'main' of https://github.com/apache/airflow into kalyan/…
rawwar Feb 27, 2025
39a35b1
session commit in make_dag_with_multiple_versions
rawwar Feb 27, 2025
1183e20
fix ci failure
rawwar Feb 27, 2025
67b009b
add unauthorized user tests
rawwar Feb 28, 2025
c5e805b
fix
rawwar Feb 28, 2025
f1b9de1
fix
rawwar Feb 28, 2025
c4aa05f
Merge branch 'main' of https://github.com/apache/airflow into kalyan/…
rawwar Feb 28, 2025
eb92a90
move dag.sync_to_db to end
rawwar Feb 28, 2025
c667c6b
Merge branch 'main' of https://github.com/apache/airflow into kalyan/…
rawwar Feb 28, 2025
978b9cb
pr feedback
rawwar Feb 28, 2025
92d6b16
fix
rawwar Feb 28, 2025
89b677e
Merge branch 'main' of https://github.com/apache/airflow into kalyan/…
rawwar Mar 1, 2025
fd75ec2
replace uri with id in AssetDetails
rawwar Mar 1, 2025
2cf9699
remove asset alias tests
rawwar Mar 1, 2025
83bce17
remove unused session
rawwar Mar 1, 2025
9ae6b95
pull from main
rawwar Mar 3, 2025
f0fc5ac
add newsfragment
rawwar Mar 3, 2025
09a5135
pull from main
rawwar Mar 3, 2025
de5ab58
rename newsfragment
rawwar Mar 5, 2025
4c2a014
Merge branch 'main' of https://github.com/apache/airflow into kalyan/…
rawwar Mar 5, 2025
8835427
use lambda
rawwar Mar 5, 2025
8527972
Merge branch 'main' of https://github.com/apache/airflow into kalyan/…
rawwar Mar 6, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 44 additions & 4 deletions airflow/api_fastapi/core_api/openapi/v1-generated.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -528,6 +528,8 @@ paths:
summary: Get Assets
description: Get assets.
operationId: get_assets
security:
- OAuth2PasswordBearer: []
parameters:
- name: limit
in: query
Expand Down Expand Up @@ -728,6 +730,8 @@ paths:
summary: Get Asset Events
description: Get asset events.
operationId: get_asset_events
security:
- OAuth2PasswordBearer: []
parameters:
- name: limit
in: query
Expand Down Expand Up @@ -847,6 +851,8 @@ paths:
summary: Create Asset Event
description: Create asset events.
operationId: create_asset_event
security:
- OAuth2PasswordBearer: []
requestBody:
required: true
content:
Expand Down Expand Up @@ -891,6 +897,8 @@ paths:
summary: Materialize Asset
description: Materialize an asset by triggering a DAG run that produces it.
operationId: materialize_asset
security:
- OAuth2PasswordBearer: []
parameters:
- name: asset_id
in: path
Expand Down Expand Up @@ -942,6 +950,8 @@ paths:
summary: Get Asset Queued Events
description: Get queued asset events for an asset.
operationId: get_asset_queued_events
security:
- OAuth2PasswordBearer: []
parameters:
- name: asset_id
in: path
Expand Down Expand Up @@ -994,6 +1004,8 @@ paths:
summary: Delete Asset Queued Events
description: Delete queued asset events for an asset.
operationId: delete_asset_queued_events
security:
- OAuth2PasswordBearer: []
parameters:
- name: asset_id
in: path
Expand All @@ -1009,6 +1021,14 @@ paths:
- type: string
- type: 'null'
title: Before
- name: dag_id
in: query
required: false
schema:
anyOf:
- type: string
- type: 'null'
title: Dag Id
responses:
'204':
description: Successful Response
Expand Down Expand Up @@ -1043,6 +1063,8 @@ paths:
summary: Get Asset
description: Get an asset.
operationId: get_asset
security:
- OAuth2PasswordBearer: []
parameters:
- name: asset_id
in: path
Expand Down Expand Up @@ -1088,12 +1110,16 @@ paths:
summary: Get Dag Asset Queued Events
description: Get queued asset events for a DAG.
operationId: get_dag_asset_queued_events
security:
- OAuth2PasswordBearer: []
parameters:
- name: dag_id
in: path
required: true
schema:
type: string
anyOf:
- type: string
- type: 'null'
title: Dag Id
- name: before
in: query
Expand Down Expand Up @@ -1139,12 +1165,16 @@ paths:
- Asset
summary: Delete Dag Asset Queued Events
operationId: delete_dag_asset_queued_events
security:
- OAuth2PasswordBearer: []
parameters:
- name: dag_id
in: path
required: true
schema:
type: string
anyOf:
- type: string
- type: 'null'
title: Dag Id
- name: before
in: query
Expand Down Expand Up @@ -1194,12 +1224,16 @@ paths:
summary: Get Dag Asset Queued Event
description: Get a queued asset event for a DAG.
operationId: get_dag_asset_queued_event
security:
- OAuth2PasswordBearer: []
parameters:
- name: dag_id
in: path
required: true
schema:
type: string
anyOf:
- type: string
- type: 'null'
title: Dag Id
- name: asset_id
in: path
Expand Down Expand Up @@ -1252,12 +1286,16 @@ paths:
summary: Delete Dag Asset Queued Event
description: Delete a queued asset event for a DAG.
operationId: delete_dag_asset_queued_event
security:
- OAuth2PasswordBearer: []
parameters:
- name: dag_id
in: path
required: true
schema:
type: string
anyOf:
- type: string
- type: 'null'
title: Dag Id
- name: asset_id
in: path
Expand Down Expand Up @@ -2223,6 +2261,8 @@ paths:
description: If dag run is asset-triggered, return the asset events that triggered
it.
operationId: get_upstream_asset_events
security:
- OAuth2PasswordBearer: []
parameters:
- name: dag_id
in: path
Expand Down
29 changes: 24 additions & 5 deletions airflow/api_fastapi/core_api/routes/public/assets.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
)
from airflow.api_fastapi.core_api.datamodels.dag_run import DAGRunResponse
from airflow.api_fastapi.core_api.openapi.exceptions import create_openapi_http_exception_doc
from airflow.api_fastapi.core_api.security import requires_access_asset, requires_access_dag
from airflow.api_fastapi.logging.decorators import action_logging
from airflow.assets.manager import asset_manager
from airflow.models.asset import (
Expand Down Expand Up @@ -91,6 +92,7 @@ def _generate_queued_event_where_clause(
@assets_router.get(
"/assets",
responses=create_openapi_http_exception_doc([status.HTTP_404_NOT_FOUND]),
dependencies=[Depends(requires_access_asset(method="GET"))],
)
def get_assets(
limit: QueryLimit,
Expand Down Expand Up @@ -173,6 +175,7 @@ def get_asset_alias(asset_alias_id: int, session: SessionDep):
@assets_router.get(
"/assets/events",
responses=create_openapi_http_exception_doc([status.HTTP_404_NOT_FOUND]),
dependencies=[Depends(requires_access_asset(method="GET"))],
)
def get_asset_events(
limit: QueryLimit,
Expand Down Expand Up @@ -232,7 +235,7 @@ def get_asset_events(
@assets_router.post(
"/assets/events",
responses=create_openapi_http_exception_doc([status.HTTP_404_NOT_FOUND]),
dependencies=[Depends(action_logging())],
dependencies=[Depends(requires_access_asset(method="POST")), Depends(action_logging())],
)
def create_asset_event(
body: CreateAssetEventsBody,
Expand All @@ -259,7 +262,7 @@ def create_asset_event(
@assets_router.post(
"/assets/{asset_id}/materialize",
responses=create_openapi_http_exception_doc([status.HTTP_404_NOT_FOUND, status.HTTP_409_CONFLICT]),
dependencies=[Depends(action_logging())],
dependencies=[Depends(requires_access_asset(method="POST")), Depends(action_logging())],
)
def materialize_asset(
asset_id: int,
Expand Down Expand Up @@ -306,6 +309,7 @@ def materialize_asset(
@assets_router.get(
"/assets/{asset_id}/queuedEvents",
responses=create_openapi_http_exception_doc([status.HTTP_404_NOT_FOUND]),
dependencies=[Depends(requires_access_asset(method="GET"))],
)
def get_asset_queued_events(
asset_id: int,
Expand Down Expand Up @@ -339,6 +343,7 @@ def get_asset_queued_events(
@assets_router.get(
"/assets/{asset_id}",
responses=create_openapi_http_exception_doc([status.HTTP_404_NOT_FOUND]),
dependencies=[Depends(requires_access_asset(method="GET"))],
)
def get_asset(
asset_id: int,
Expand All @@ -360,6 +365,7 @@ def get_asset(
@assets_router.get(
"/dags/{dag_id}/assets/queuedEvents",
responses=create_openapi_http_exception_doc([status.HTTP_404_NOT_FOUND]),
dependencies=[Depends(requires_access_asset(method="GET")), Depends(requires_access_dag(method="GET"))],
)
def get_dag_asset_queued_events(
dag_id: str,
Expand Down Expand Up @@ -389,6 +395,7 @@ def get_dag_asset_queued_events(
@assets_router.get(
"/dags/{dag_id}/assets/{asset_id}/queuedEvents",
responses=create_openapi_http_exception_doc([status.HTTP_404_NOT_FOUND]),
dependencies=[Depends(requires_access_asset(method="GET")), Depends(requires_access_dag(method="GET"))],
)
def get_dag_asset_queued_event(
dag_id: str,
Expand All @@ -413,7 +420,11 @@ def get_dag_asset_queued_event(
"/assets/{asset_id}/queuedEvents",
status_code=status.HTTP_204_NO_CONTENT,
responses=create_openapi_http_exception_doc([status.HTTP_404_NOT_FOUND]),
dependencies=[Depends(action_logging())],
dependencies=[
Depends(requires_access_asset(method="DELETE")),
Depends(requires_access_dag(method="GET")),
Depends(action_logging()),
],
)
def delete_asset_queued_events(
asset_id: int,
Expand All @@ -440,7 +451,11 @@ def delete_asset_queued_events(
status.HTTP_404_NOT_FOUND,
]
),
dependencies=[Depends(action_logging())],
dependencies=[
Depends(requires_access_asset(method="DELETE")),
Depends(requires_access_dag(method="GET")),
Depends(action_logging()),
],
)
def delete_dag_asset_queued_events(
dag_id: str,
Expand All @@ -465,7 +480,11 @@ def delete_dag_asset_queued_events(
status.HTTP_404_NOT_FOUND,
]
),
dependencies=[Depends(action_logging())],
dependencies=[
Depends(requires_access_asset(method="DELETE")),
Depends(requires_access_dag(method="GET")),
Depends(action_logging()),
],
)
def delete_dag_asset_queued_event(
dag_id: str,
Expand Down
2 changes: 2 additions & 0 deletions airflow/api_fastapi/core_api/routes/public/dag_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
TaskInstanceResponse,
)
from airflow.api_fastapi.core_api.openapi.exceptions import create_openapi_http_exception_doc
from airflow.api_fastapi.core_api.security import requires_access_asset
from airflow.api_fastapi.logging.decorators import action_logging
from airflow.exceptions import ParamValidationError
from airflow.listeners.listener import get_listener_manager
Expand Down Expand Up @@ -189,6 +190,7 @@ def patch_dag_run(
status.HTTP_404_NOT_FOUND,
]
),
dependencies=[Depends(requires_access_asset(method="GET"))],
)
def get_upstream_asset_events(
dag_id: str, dag_run_id: str, session: SessionDep
Expand Down
17 changes: 17 additions & 0 deletions airflow/api_fastapi/core_api/security.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
from airflow.api_fastapi.app import get_auth_manager
from airflow.auth.managers.models.base_user import BaseUser
from airflow.auth.managers.models.resource_details import (
AssetDetails,
ConnectionDetails,
DagAccessEntity,
DagDetails,
Expand Down Expand Up @@ -138,6 +139,22 @@ def inner(
return inner


def requires_access_asset(method: ResourceMethod) -> Callable:
def inner(
request: Request,
user: Annotated[BaseUser | None, Depends(get_user)] = None,
) -> None:
asset_id = request.path_params.get("asset_id")

_requires_access(
is_authorized_callback=lambda: get_auth_manager().is_authorized_asset(
method=method, details=AssetDetails(id=asset_id), user=user
),
)

return inner


def _requires_access(
*,
is_authorized_callback: Callable[[], bool],
Expand Down
2 changes: 1 addition & 1 deletion airflow/auth/managers/models/resource_details.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ class DagDetails:
class AssetDetails:
"""Represents the details of an asset."""

uri: str | None = None
id: str | None = None


@dataclass
Expand Down
7 changes: 5 additions & 2 deletions airflow/ui/openapi-gen/queries/queries.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4351,6 +4351,7 @@ export const useVariableServiceBulkVariables = <
* @param data The data for the request.
* @param data.assetId
* @param data.before
* @param data.dagId
* @returns void Successful Response
* @throws ApiError
*/
Expand All @@ -4366,6 +4367,7 @@ export const useAssetServiceDeleteAssetQueuedEvents = <
{
assetId: number;
before?: string;
dagId?: string;
},
TContext
>,
Expand All @@ -4378,11 +4380,12 @@ export const useAssetServiceDeleteAssetQueuedEvents = <
{
assetId: number;
before?: string;
dagId?: string;
},
TContext
>({
mutationFn: ({ assetId, before }) =>
AssetService.deleteAssetQueuedEvents({ assetId, before }) as unknown as Promise<TData>,
mutationFn: ({ assetId, before, dagId }) =>
AssetService.deleteAssetQueuedEvents({ assetId, before, dagId }) as unknown as Promise<TData>,
...options,
});
/**
Expand Down
2 changes: 2 additions & 0 deletions airflow/ui/openapi-gen/requests/services.gen.ts
Original file line number Diff line number Diff line change
Expand Up @@ -448,6 +448,7 @@ export class AssetService {
* @param data The data for the request.
* @param data.assetId
* @param data.before
* @param data.dagId
* @returns void Successful Response
* @throws ApiError
*/
Expand All @@ -462,6 +463,7 @@ export class AssetService {
},
query: {
before: data.before,
dag_id: data.dagId,
},
errors: {
401: "Unauthorized",
Expand Down
Loading