-
Notifications
You must be signed in to change notification settings - Fork 159
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
refactor(BA-502): Add the skeleton interface of vfolder CRUD handlers…
… in storage-proxy (#3434)
- Loading branch information
Showing
9 changed files
with
636 additions
and
252 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
python_sources() |
This file was deleted.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,108 +1,88 @@ | ||
from typing import AsyncContextManager, Type, TypeVar, cast | ||
|
||
import weakref | ||
import uuid | ||
|
||
from aiohttp import web | ||
|
||
|
||
from ai.backend.storage.api.manager import token_auth_middleware | ||
from ai.backend.storage.api.vfolder.manager_service import VFolderService | ||
from ai.backend.storage.context import RootContext, PrivateContext | ||
|
||
|
||
T = TypeVar("T") | ||
from ai.backend.storage.api.vfolder.types import ( | ||
QuotaConfigModel, | ||
QuotaIDModel, | ||
VFolderCloneModel, | ||
VFolderIDModel, | ||
VFolderInfoRequestModel, | ||
VolumeIDModel, | ||
) | ||
|
||
|
||
class VFolderHandler: | ||
def __init__(self, storage_service: VFolderService) -> None: | ||
self.storage_service = storage_service | ||
|
||
async def get_volume(self, request: web.Request) -> web.Response: | ||
return web.Response(text="Volume info") | ||
data = await request.json() | ||
data["volume_id"] = uuid.UUID(data["volume_id"]) | ||
req = VolumeIDModel(**data) | ||
result = await self.storage_service.get_volume(req) | ||
return web.json_response(result) | ||
|
||
async def get_volumes(self, request: web.Request) -> web.Response: | ||
return web.Response(text="Volumes list") | ||
result = await self.storage_service.get_volumes() | ||
# Assume that the volume_dict is a dictionary of VolumeInfoModel objects | ||
volumes_dict = result.volumes | ||
volumes_dict = {k: v for k, v in volumes_dict.items()} | ||
return web.json_response(volumes_dict) | ||
|
||
async def create_quota_scope(self, request: web.Request) -> web.Response: | ||
data = await request.json() | ||
data["volume_id"] = uuid.UUID(data["volume_id"]) | ||
req = QuotaConfigModel(**data) | ||
await self.storage_service.create_quota_scope(req) | ||
return web.Response(status=204) | ||
|
||
async def get_quota_scope(self, request: web.Request) -> web.Response: | ||
data = await request.json() | ||
data["volume_id"] = uuid.UUID(data["volume_id"]) | ||
req = QuotaIDModel(**data) | ||
result = await self.storage_service.get_quota_scope(req) | ||
return web.json_response(result) | ||
|
||
async def update_quota_scope(self, request: web.Request) -> web.Response: | ||
data = await request.json() | ||
data["volume_id"] = uuid.UUID(data["volume_id"]) | ||
req = QuotaConfigModel(**data) | ||
await self.storage_service.update_quota_scope(req) | ||
return web.Response(status=204) | ||
|
||
async def delete_quota_scope(self, request: web.Request) -> web.Response: | ||
data = await request.json() | ||
data["volume_id"] = uuid.UUID(data["volume_id"]) | ||
req = QuotaIDModel(**data) | ||
await self.storage_service.delete_quota_scope(req) | ||
return web.Response(status=204) | ||
|
||
async def create_vfolder(self, request: web.Request) -> web.Response: | ||
return web.Response(text="VFolder created", status=204) | ||
data = await request.json() | ||
data["volume_id"] = uuid.UUID(data["volume_id"]) | ||
req = VFolderIDModel(**data) | ||
await self.storage_service.create_vfolder(req) | ||
return web.Response(status=204) | ||
|
||
async def clone_vfolder(self, request: web.Request) -> web.Response: | ||
return web.Response(text="VFolder cloned", status=204) | ||
|
||
async def get_vfolders(self, request: web.Request) -> web.Response: | ||
return web.Response(text="VFolders list") | ||
data = await request.json() | ||
data["volume_id"] = uuid.UUID(data["volume_id"]) | ||
req = VFolderCloneModel(**data) | ||
await self.storage_service.clone_vfolder(req) | ||
return web.Response(status=204) | ||
|
||
async def get_vfolder_info(self, request: web.Request) -> web.Response: | ||
return web.Response(text="VFolder info") | ||
|
||
async def update_vfolder_options(self, request: web.Request) -> web.Response: | ||
return web.Response(text="VFolder updated") | ||
data = await request.json() | ||
data["volume_id"] = uuid.UUID(data["volume_id"]) | ||
req = VFolderInfoRequestModel(**data) | ||
result = await self.storage_service.get_vfolder_info(req) | ||
return web.json_response(result) | ||
|
||
async def delete_vfolder(self, request: web.Request) -> web.Response: | ||
return web.Response(text="VFolder deleted", status=202) | ||
|
||
# async def _extract_params(self, request: web.Request, schema: Type[T]) -> AsyncContextManager[T]: | ||
# """ | ||
# pydantic에서 자주 활용되는 방식 찾아보기 | ||
# middleware에서 처리하는 방식도 고려해보기""" | ||
# data = await request.json() | ||
# try: | ||
# params = schema(**data) | ||
# except TypeError as e: | ||
# raise web.HTTPBadRequest( # Backend.AI의 Exception 패키지 확인하기 | ||
# reason=f"Invalid request parameters: {str(e)}" | ||
# ) | ||
# # 데이터 검증을 위에서 같이 진행해서 check_params 제외함 | ||
# return cast(AsyncContextManager[T], params) | ||
|
||
|
||
async def init_manager_app(ctx: RootContext) -> web.Application: | ||
storage_service = VFolderService(ctx) | ||
storage_handler = VFolderHandler(storage_service) | ||
|
||
app = web.Application( | ||
middlewares=[ | ||
token_auth_middleware, | ||
], | ||
) | ||
app["ctx"] = ctx | ||
app["app_ctx"] = PrivateContext( | ||
deletion_tasks=weakref.WeakValueDictionary()) | ||
|
||
# Volume | ||
app.router.add_route( | ||
"POST", "/volumes/{volume_id}", storage_handler.get_volume) | ||
app.router.add_route( | ||
"GET", "/volumes", storage_handler.get_volumes) | ||
# VFolder | ||
app.router.add_route( | ||
"POST", "/volumes/{volume_id}/vfolders/", storage_handler.create_vfolder | ||
) | ||
app.router.add_route( | ||
"POST", "/volumes/{volume_id}/vfolders/{vfolder_id}/clone", storage_handler.clone_vfolder) | ||
app.router.add_route( | ||
"GET", "/volumes/{volume_id}/vfolders", storage_handler.get_vfolders | ||
) | ||
app.router.add_route( | ||
"GET", "/volumes/{volume_id}/vfolders/{vfolder_id}", storage_handler.get_vfolder_info | ||
) | ||
app.router.add_route( | ||
"PUT", "/volumes/{volume_id}/vfolders/{vfolder_id}", storage_handler.update_vfolder_options | ||
) | ||
app.router.add_route( | ||
"DELETE", "/volumes/{volume_id}/vfolders/{vfolder_id}", storage_handler.delete_vfolder | ||
) | ||
|
||
# evd = ctx.event_dispatcher | ||
# evd.subscribe( | ||
# DoVolumeMountEvent, | ||
# storage_service.handle_volume_mount, | ||
# name="storage.volume.mount" | ||
# ) | ||
# evd.subscribe( | ||
# DoVolumeUnmountEvent, | ||
# storage_service.handle_volume_umount, | ||
# name="storage.volume.umount" | ||
# ) | ||
|
||
return app | ||
data = await request.json() | ||
data["volume_id"] = uuid.UUID(data["volume_id"]) | ||
req = VFolderIDModel(**data) | ||
await self.storage_service.delete_vfolder(req) | ||
return web.Response(status=202) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,83 +1,90 @@ | ||
import asyncio | ||
from pathlib import Path | ||
from typing import AsyncIterator, Mapping, Protocol, Dict, Any | ||
import weakref | ||
|
||
from ai.backend.common.events import DoVolumeMountEvent, DoVolumeUnmountEvent | ||
from ai.backend.common.types import VFolderID | ||
from ai.backend.storage.abc import AbstractVolume | ||
from ai.backend.storage.api.vfolder.types import VFolderData, VolumeBaseData | ||
from ai.backend.storage.context import RootContext | ||
from ai.backend.storage.types import VolumeInfo | ||
from pathlib import Path, PurePath | ||
from typing import Protocol | ||
|
||
from ai.backend.common.types import BinarySize | ||
from ai.backend.storage.api.vfolder.types import ( | ||
QuotaConfigModel, | ||
QuotaIDModel, | ||
QuotaScopeInfoModel, | ||
VFolderCloneModel, | ||
VFolderIDModel, | ||
VFolderInfoModel, | ||
VFolderInfoRequestModel, | ||
VolumeIDModel, | ||
VolumeInfoListModel, | ||
VolumeInfoModel, | ||
) | ||
from ai.backend.storage.types import CapacityUsage, TreeUsage | ||
|
||
|
||
class VFolderServiceProtocol(Protocol): | ||
async def get_volume(self, volume_data: VolumeBaseData) -> AsyncIterator[AbstractVolume]: | ||
async def get_volume(self, volume_data: VolumeIDModel) -> VolumeInfoModel: | ||
"""by volume_id""" | ||
... | ||
|
||
async def get_volumes(self) -> Mapping[str, VolumeInfo]: | ||
... | ||
async def get_volumes(self) -> VolumeInfoListModel: ... | ||
|
||
async def create_vfolder(self, volume_id: str, vfolder_id: VFolderID, options: VFolderOptions) -> None: | ||
... | ||
async def create_quota_scope(self, quota_config_data: QuotaConfigModel) -> None: ... | ||
|
||
async def clone_vfolder(self, volume_id: str, vfolder_id: VFolderID, new_vfolder_id: VFolderID, options: VFolderOptions) -> None: | ||
... | ||
async def get_quota_scope(self, quota_data: QuotaIDModel) -> QuotaScopeInfoModel: ... | ||
|
||
async def get_vfolders(self, volume_id: str) -> list[Dict[str, Any]]: | ||
... | ||
async def update_quota_scope(self, quota_config_data: QuotaConfigModel) -> None: ... | ||
|
||
async def get_vfolder_info(self, volume_id: str, vfolder_id: VFolderID) -> Dict[str, Any]: | ||
async def delete_quota_scope(self, quota_data: QuotaIDModel) -> None: | ||
"""Previous: unset_quota""" | ||
... | ||
|
||
"""TODO: options type 정의 필요 | ||
create 시와 필드가 겹친다면 따로 정의 X""" | ||
async def create_vfolder(self, vfolder_data: VFolderIDModel) -> None: ... | ||
|
||
async def update_vfolder_options(self, volume_id: str, vfolder_id: VFolderID, options: ...) -> None: | ||
... | ||
async def clone_vfolder(self, vfolder_clone_data: VFolderCloneModel) -> None: ... | ||
|
||
async def delete_vfolder(self, volume_id: str, vfolder_id: VFolderID) -> None: | ||
async def get_vfolder_info(self, vfolder_info: VFolderInfoRequestModel) -> VFolderInfoModel: | ||
# Integration: vfolder_mount, metadata, vfolder_usage, vfolder_used_bytes, vfolder_fs_usage | ||
... | ||
|
||
async def delete_vfolder(self, vfolder_data: VFolderIDModel) -> None: ... | ||
|
||
class VFolderService(VFolderServiceProtocol): | ||
def __init__(self, ctx: RootContext) -> None: | ||
self.ctx = ctx | ||
|
||
async def get_volume(self, volume_data: VolumeBaseData) -> AsyncIterator[AbstractVolume]: | ||
... | ||
|
||
async def get_volumes(self) -> Mapping[str, VolumeInfo]: | ||
... | ||
class VFolderService: | ||
async def get_volume(self, volume_data: VolumeIDModel) -> VolumeInfoModel: | ||
return VolumeInfoModel( | ||
volume_id=volume_data.volume_id, | ||
backend="default-backend", | ||
path=Path("/default/path"), | ||
fsprefix=PurePath("/fsprefix"), | ||
capabilities=["read", "write"], | ||
options={"option1": "value1"}, | ||
) | ||
|
||
async def handle_volume_mount(self, event: DoVolumeMountEvent) -> None: | ||
... | ||
async def get_volumes(self) -> VolumeInfoListModel: | ||
return VolumeInfoListModel(volumes={}) | ||
|
||
async def handle_volume_umount(self, event: DoVolumeUnmountEvent) -> None: | ||
... | ||
async def create_quota_scope(self, quota_config_data: QuotaConfigModel) -> None: | ||
return None | ||
|
||
async def create_vfolder(self, vfolder_data: VFolderData) -> None: | ||
... | ||
async def get_quota_scope(self, quota_data: QuotaIDModel) -> QuotaScopeInfoModel: | ||
return QuotaScopeInfoModel(used_bytes=0, limit_bytes=0) | ||
|
||
async def clone_vfolder(self, vfolder_data: VFolderData, new_vfolder_id: VFolderID) -> None: | ||
... | ||
async def update_quota_scope(self, quota_config_data: QuotaConfigModel) -> None: | ||
return None | ||
|
||
async def get_vfolders(self, volume_id: str) -> list[Dict[str, Any]]: | ||
... | ||
async def delete_quota_scope(self, quota_data: QuotaIDModel) -> None: | ||
return None | ||
|
||
async def get_vfolder_info(self, volume_id: str, vfolder_id: VFolderID) -> Dict[str, Any]: | ||
... | ||
async def create_vfolder(self, vfolder_data: VFolderIDModel) -> None: | ||
return None | ||
|
||
async def update_vfolder_options(self, volume_id: str, vfolder_id: VFolderID, options: ...) -> None: | ||
... | ||
async def clone_vfolder(self, vfolder_clone_data: VFolderCloneModel) -> None: | ||
return None | ||
|
||
async def _delete_vfolder( | ||
self, | ||
vfolder_data: VFolderData, | ||
task_map: weakref.WeakValueDictionary[VFolderID, asyncio.Task] | ||
) -> None: | ||
... | ||
async def get_vfolder_info(self, vfolder_info: VFolderInfoRequestModel) -> VFolderInfoModel: | ||
return VFolderInfoModel( | ||
vfolder_mount=Path("/mount/point"), | ||
vfolder_metadata=b"", | ||
vfolder_usage=TreeUsage(file_count=0, used_bytes=0), | ||
vfolder_used_bytes=BinarySize(0), | ||
vfolder_fs_usage=CapacityUsage(used_bytes=0, capacity_bytes=0), | ||
) | ||
|
||
async def delete_vfolder(self, vfolder_data: VFolderData) -> None: | ||
... | ||
async def delete_vfolder(self, vfolder_data: VFolderIDModel) -> None: | ||
return None |
Oops, something went wrong.