Skip to content
Merged
108 changes: 89 additions & 19 deletions invokeai/app/api/routers/session_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from fastapi.routing import APIRouter
from pydantic import BaseModel

from invokeai.app.api.auth_dependencies import CurrentUser
from invokeai.app.api.auth_dependencies import AdminUser, CurrentUser
from invokeai.app.api.dependencies import ApiDependencies
from invokeai.app.services.session_processor.session_processor_common import SessionProcessorStatus
from invokeai.app.services.session_queue.session_queue_common import (
Expand Down Expand Up @@ -177,9 +177,10 @@ async def get_queue_items_by_item_ids(
responses={200: {"model": SessionProcessorStatus}},
)
async def resume(
current_user: AdminUser,
queue_id: str = Path(description="The queue id to perform this operation on"),
) -> SessionProcessorStatus:
"""Resumes session processor"""
"""Resumes session processor. Admin only."""
try:
return ApiDependencies.invoker.services.session_processor.resume()
except Exception as e:
Expand All @@ -192,9 +193,10 @@ async def resume(
responses={200: {"model": SessionProcessorStatus}},
)
async def Pause(
current_user: AdminUser,
queue_id: str = Path(description="The queue id to perform this operation on"),
) -> SessionProcessorStatus:
"""Pauses session processor"""
"""Pauses session processor. Admin only."""
try:
return ApiDependencies.invoker.services.session_processor.pause()
except Exception as e:
Expand All @@ -207,11 +209,16 @@ async def Pause(
responses={200: {"model": CancelAllExceptCurrentResult}},
)
async def cancel_all_except_current(
current_user: CurrentUser,
queue_id: str = Path(description="The queue id to perform this operation on"),
) -> CancelAllExceptCurrentResult:
"""Immediately cancels all queue items except in-processing items"""
"""Immediately cancels all queue items except in-processing items. Non-admin users can only cancel their own items."""
try:
return ApiDependencies.invoker.services.session_queue.cancel_all_except_current(queue_id=queue_id)
# Admin users can cancel all items, non-admin users can only cancel their own
user_id = None if current_user.is_admin else current_user.user_id
return ApiDependencies.invoker.services.session_queue.cancel_all_except_current(
queue_id=queue_id, user_id=user_id
)
except Exception as e:
raise HTTPException(status_code=500, detail=f"Unexpected error while canceling all except current: {e}")

Expand All @@ -222,11 +229,16 @@ async def cancel_all_except_current(
responses={200: {"model": DeleteAllExceptCurrentResult}},
)
async def delete_all_except_current(
current_user: CurrentUser,
queue_id: str = Path(description="The queue id to perform this operation on"),
) -> DeleteAllExceptCurrentResult:
"""Immediately deletes all queue items except in-processing items"""
"""Immediately deletes all queue items except in-processing items. Non-admin users can only delete their own items."""
try:
return ApiDependencies.invoker.services.session_queue.delete_all_except_current(queue_id=queue_id)
# Admin users can delete all items, non-admin users can only delete their own
user_id = None if current_user.is_admin else current_user.user_id
return ApiDependencies.invoker.services.session_queue.delete_all_except_current(
queue_id=queue_id, user_id=user_id
)
except Exception as e:
raise HTTPException(status_code=500, detail=f"Unexpected error while deleting all except current: {e}")

Expand All @@ -237,13 +249,16 @@ async def delete_all_except_current(
responses={200: {"model": CancelByBatchIDsResult}},
)
async def cancel_by_batch_ids(
current_user: CurrentUser,
queue_id: str = Path(description="The queue id to perform this operation on"),
batch_ids: list[str] = Body(description="The list of batch_ids to cancel all queue items for", embed=True),
) -> CancelByBatchIDsResult:
"""Immediately cancels all queue items from the given batch ids"""
"""Immediately cancels all queue items from the given batch ids. Non-admin users can only cancel their own items."""
try:
# Admin users can cancel all items, non-admin users can only cancel their own
user_id = None if current_user.is_admin else current_user.user_id
return ApiDependencies.invoker.services.session_queue.cancel_by_batch_ids(
queue_id=queue_id, batch_ids=batch_ids
queue_id=queue_id, batch_ids=batch_ids, user_id=user_id
)
except Exception as e:
raise HTTPException(status_code=500, detail=f"Unexpected error while canceling by batch id: {e}")
Expand All @@ -255,13 +270,16 @@ async def cancel_by_batch_ids(
responses={200: {"model": CancelByDestinationResult}},
)
async def cancel_by_destination(
current_user: CurrentUser,
queue_id: str = Path(description="The queue id to perform this operation on"),
destination: str = Query(description="The destination to cancel all queue items for"),
) -> CancelByDestinationResult:
"""Immediately cancels all queue items with the given origin"""
"""Immediately cancels all queue items with the given destination. Non-admin users can only cancel their own items."""
try:
# Admin users can cancel all items, non-admin users can only cancel their own
user_id = None if current_user.is_admin else current_user.user_id
return ApiDependencies.invoker.services.session_queue.cancel_by_destination(
queue_id=queue_id, destination=destination
queue_id=queue_id, destination=destination, user_id=user_id
)
except Exception as e:
raise HTTPException(status_code=500, detail=f"Unexpected error while canceling by destination: {e}")
Expand All @@ -273,12 +291,28 @@ async def cancel_by_destination(
responses={200: {"model": RetryItemsResult}},
)
async def retry_items_by_id(
current_user: CurrentUser,
queue_id: str = Path(description="The queue id to perform this operation on"),
item_ids: list[int] = Body(description="The queue item ids to retry"),
) -> RetryItemsResult:
"""Immediately cancels all queue items with the given origin"""
"""Retries the given queue items. Users can only retry their own items unless they are an admin."""
try:
# Check authorization: user must own all items or be an admin
if not current_user.is_admin:
for item_id in item_ids:
try:
queue_item = ApiDependencies.invoker.services.session_queue.get_queue_item(item_id)
if queue_item.user_id != current_user.user_id:
raise HTTPException(
status_code=403, detail=f"You do not have permission to retry queue item {item_id}"
)
except SessionQueueItemNotFoundError:
# Skip items that don't exist - they will be handled by retry_items_by_id
continue

return ApiDependencies.invoker.services.session_queue.retry_items_by_id(queue_id=queue_id, item_ids=item_ids)
except HTTPException:
raise
except Exception as e:
raise HTTPException(status_code=500, detail=f"Unexpected error while retrying queue items: {e}")

Expand All @@ -291,15 +325,23 @@ async def retry_items_by_id(
},
)
async def clear(
current_user: CurrentUser,
queue_id: str = Path(description="The queue id to perform this operation on"),
) -> ClearResult:
"""Clears the queue entirely, immediately canceling the currently-executing session"""
"""Clears the queue entirely. If there's a currently-executing item, users can only cancel it if they own it or are an admin."""
try:
queue_item = ApiDependencies.invoker.services.session_queue.get_current(queue_id)
if queue_item is not None:
# Check authorization for canceling the current item
if queue_item.user_id != current_user.user_id and not current_user.is_admin:
raise HTTPException(
status_code=403, detail="You do not have permission to cancel the currently executing queue item"
)
ApiDependencies.invoker.services.session_queue.cancel_queue_item(queue_item.item_id)
clear_result = ApiDependencies.invoker.services.session_queue.clear(queue_id)
return clear_result
except HTTPException:
raise
except Exception as e:
raise HTTPException(status_code=500, detail=f"Unexpected error while clearing queue: {e}")

Expand All @@ -312,11 +354,14 @@ async def clear(
},
)
async def prune(
current_user: CurrentUser,
queue_id: str = Path(description="The queue id to perform this operation on"),
) -> PruneResult:
"""Prunes all completed or errored queue items"""
"""Prunes all completed or errored queue items. Non-admin users can only prune their own items."""
try:
return ApiDependencies.invoker.services.session_queue.prune(queue_id)
# Admin users can prune all items, non-admin users can only prune their own
user_id = None if current_user.is_admin else current_user.user_id
return ApiDependencies.invoker.services.session_queue.prune(queue_id, user_id=user_id)
except Exception as e:
raise HTTPException(status_code=500, detail=f"Unexpected error while pruning queue: {e}")

Expand Down Expand Up @@ -423,12 +468,24 @@ async def get_queue_item(
operation_id="delete_queue_item",
)
async def delete_queue_item(
current_user: CurrentUser,
queue_id: str = Path(description="The queue id to perform this operation on"),
item_id: int = Path(description="The queue item to delete"),
) -> None:
"""Deletes a queue item"""
"""Deletes a queue item. Users can only delete their own items unless they are an admin."""
try:
# Get the queue item to check ownership
queue_item = ApiDependencies.invoker.services.session_queue.get_queue_item(item_id)

# Check authorization: user must own the item or be an admin
if queue_item.user_id != current_user.user_id and not current_user.is_admin:
raise HTTPException(status_code=403, detail="You do not have permission to delete this queue item")

ApiDependencies.invoker.services.session_queue.delete_queue_item(item_id)
except SessionQueueItemNotFoundError:
raise HTTPException(status_code=404, detail=f"Queue item with id {item_id} not found in queue {queue_id}")
except HTTPException:
raise
except Exception as e:
raise HTTPException(status_code=500, detail=f"Unexpected error while deleting queue item: {e}")

Expand All @@ -441,14 +498,24 @@ async def delete_queue_item(
},
)
async def cancel_queue_item(
current_user: CurrentUser,
queue_id: str = Path(description="The queue id to perform this operation on"),
item_id: int = Path(description="The queue item to cancel"),
) -> SessionQueueItem:
"""Deletes a queue item"""
"""Cancels a queue item. Users can only cancel their own items unless they are an admin."""
try:
# Get the queue item to check ownership
queue_item = ApiDependencies.invoker.services.session_queue.get_queue_item(item_id)

# Check authorization: user must own the item or be an admin
if queue_item.user_id != current_user.user_id and not current_user.is_admin:
raise HTTPException(status_code=403, detail="You do not have permission to cancel this queue item")

return ApiDependencies.invoker.services.session_queue.cancel_queue_item(item_id)
except SessionQueueItemNotFoundError:
raise HTTPException(status_code=404, detail=f"Queue item with id {item_id} not found in queue {queue_id}")
except HTTPException:
raise
except Exception as e:
raise HTTPException(status_code=500, detail=f"Unexpected error while canceling queue item: {e}")

Expand Down Expand Up @@ -477,13 +544,16 @@ async def counts_by_destination(
responses={200: {"model": DeleteByDestinationResult}},
)
async def delete_by_destination(
current_user: CurrentUser,
queue_id: str = Path(description="The queue id to query"),
destination: str = Path(description="The destination to query"),
) -> DeleteByDestinationResult:
"""Deletes all items with the given destination"""
"""Deletes all items with the given destination. Non-admin users can only delete their own items."""
try:
# Admin users can delete all items, non-admin users can only delete their own
user_id = None if current_user.is_admin else current_user.user_id
return ApiDependencies.invoker.services.session_queue.delete_by_destination(
queue_id=queue_id, destination=destination
queue_id=queue_id, destination=destination, user_id=user_id
)
except Exception as e:
raise HTTPException(status_code=500, detail=f"Unexpected error while deleting by destination: {e}")
30 changes: 18 additions & 12 deletions invokeai/app/services/session_queue/session_queue_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,8 @@ def clear(self, queue_id: str) -> ClearResult:
pass

@abstractmethod
def prune(self, queue_id: str) -> PruneResult:
"""Deletes all completed and errored session queue items"""
def prune(self, queue_id: str, user_id: Optional[str] = None) -> PruneResult:
"""Deletes all completed and errored session queue items. If user_id is provided, only prunes items owned by that user."""
pass

@abstractmethod
Expand Down Expand Up @@ -110,18 +110,24 @@ def fail_queue_item(
pass

@abstractmethod
def cancel_by_batch_ids(self, queue_id: str, batch_ids: list[str]) -> CancelByBatchIDsResult:
"""Cancels all queue items with matching batch IDs"""
def cancel_by_batch_ids(
self, queue_id: str, batch_ids: list[str], user_id: Optional[str] = None
) -> CancelByBatchIDsResult:
"""Cancels all queue items with matching batch IDs. If user_id is provided, only cancels items owned by that user."""
pass

@abstractmethod
def cancel_by_destination(self, queue_id: str, destination: str) -> CancelByDestinationResult:
"""Cancels all queue items with the given batch destination"""
def cancel_by_destination(
self, queue_id: str, destination: str, user_id: Optional[str] = None
) -> CancelByDestinationResult:
"""Cancels all queue items with the given batch destination. If user_id is provided, only cancels items owned by that user."""
pass

@abstractmethod
def delete_by_destination(self, queue_id: str, destination: str) -> DeleteByDestinationResult:
"""Deletes all queue items with the given batch destination"""
def delete_by_destination(
self, queue_id: str, destination: str, user_id: Optional[str] = None
) -> DeleteByDestinationResult:
"""Deletes all queue items with the given batch destination. If user_id is provided, only deletes items owned by that user."""
pass

@abstractmethod
Expand All @@ -130,13 +136,13 @@ def cancel_by_queue_id(self, queue_id: str) -> CancelByQueueIDResult:
pass

@abstractmethod
def cancel_all_except_current(self, queue_id: str) -> CancelAllExceptCurrentResult:
"""Cancels all queue items except in-progress items"""
def cancel_all_except_current(self, queue_id: str, user_id: Optional[str] = None) -> CancelAllExceptCurrentResult:
"""Cancels all queue items except in-progress items. If user_id is provided, only cancels items owned by that user."""
pass

@abstractmethod
def delete_all_except_current(self, queue_id: str) -> DeleteAllExceptCurrentResult:
"""Deletes all queue items except in-progress items"""
def delete_all_except_current(self, queue_id: str, user_id: Optional[str] = None) -> DeleteAllExceptCurrentResult:
"""Deletes all queue items except in-progress items. If user_id is provided, only deletes items owned by that user."""
pass

@abstractmethod
Expand Down
Loading
Loading