-
-
Notifications
You must be signed in to change notification settings - Fork 600
feat: WebCam feature #583
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: WebCam feature #583
Changes from all commits
8b2c12b
0129f7c
52a8a57
7b73b34
4fc47c7
5c05baf
1e4b900
5d06b19
b9a36dd
2c037c8
79c9b67
f655d50
994b943
ed5168b
b083616
4fba2f2
ec95615
14f55fc
efe79db
fd18420
dbc0820
5b216a6
1f7113d
eb99b0e
7c8bef7
7297b59
a12a7c9
340b1e9
aed41b4
aa01490
8967c03
e74e37e
06f3c53
66b22a5
7c3eeb0
f7c4a97
ac41b8b
8a1a930
8d7a082
e39321e
6fbb39d
d1c9131
acadd54
5372fbe
19d11d3
2dfa314
73278fc
df38c90
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,19 +1,16 @@ | ||
| import logging | ||
| from binascii import Error as Base64Error | ||
| import base64 | ||
| from typing import Annotated | ||
| import uuid | ||
| import os | ||
| from typing import Optional, List, Dict, Any | ||
| from pydantic import BaseModel | ||
| from app.config.settings import CONFIDENCE_PERCENT, DEFAULT_FACENET_MODEL | ||
| from fastapi import APIRouter, HTTPException, status | ||
| from fastapi import APIRouter, HTTPException, Query, status | ||
| from app.database.face_clusters import ( | ||
| db_get_cluster_by_id, | ||
| db_update_cluster, | ||
| db_get_all_clusters_with_face_counts, | ||
| db_get_images_by_cluster_id, # Add this import | ||
| ) | ||
| from app.database.faces import get_all_face_embeddings | ||
| from app.models.FaceDetector import FaceDetector | ||
| from app.models.FaceNet import FaceNet | ||
| from app.schemas.face_clusters import ( | ||
| RenameClusterRequest, | ||
| RenameClusterResponse, | ||
|
|
@@ -26,32 +23,8 @@ | |
| GetClusterImagesData, | ||
| ImageInCluster, | ||
| ) | ||
| from app.schemas.images import AddSingleImageRequest | ||
| from app.utils.FaceNet import FaceNet_util_cosine_similarity | ||
|
|
||
|
|
||
| class BoundingBox(BaseModel): | ||
| x: float | ||
| y: float | ||
| width: float | ||
| height: float | ||
|
|
||
|
|
||
| class ImageData(BaseModel): | ||
| id: str | ||
| path: str | ||
| folder_id: str | ||
| thumbnailPath: str | ||
| metadata: Dict[str, Any] | ||
| isTagged: bool | ||
| tags: Optional[List[str]] = None | ||
| bboxes: BoundingBox | ||
|
|
||
|
|
||
| class GetAllImagesResponse(BaseModel): | ||
| success: bool | ||
| message: str | ||
| data: List[ImageData] | ||
| from app.schemas.images import FaceSearchRequest, InputType | ||
| from app.utils.faceSearch import perform_face_search | ||
|
|
||
|
|
||
| logger = logging.getLogger(__name__) | ||
|
|
@@ -236,67 +209,91 @@ def get_cluster_images(cluster_id: str): | |
| "/face-search", | ||
| responses={code: {"model": ErrorResponse} for code in [400, 500]}, | ||
| ) | ||
| def face_tagging(payload: AddSingleImageRequest): | ||
| image_path = payload.path | ||
| if not os.path.isfile(image_path): | ||
| raise HTTPException( | ||
| status_code=status.HTTP_400_BAD_REQUEST, | ||
| detail=ErrorResponse( | ||
| success=False, | ||
| error="Invalid file path", | ||
| message="The provided path is not a valid file", | ||
| ).model_dump(), | ||
| ) | ||
|
|
||
| fd = FaceDetector() | ||
| fn = FaceNet(DEFAULT_FACENET_MODEL) | ||
| try: | ||
| matches = [] | ||
| image_id = str(uuid.uuid4()) | ||
| result = fd.detect_faces(image_id, image_path, forSearch=True) | ||
| if not result or result["num_faces"] == 0: | ||
| return GetAllImagesResponse( | ||
| success=True, | ||
| message=f"Successfully retrieved {len(matches)} images", | ||
| data=[], | ||
| def face_tagging( | ||
| payload: FaceSearchRequest, | ||
| input_type: Annotated[ | ||
| InputType, Query(description="Choose input type: 'path' or 'base64'") | ||
| ] = InputType.path, | ||
| ): | ||
| image_path = None | ||
|
|
||
| if input_type == InputType.path: | ||
| local_file_path = payload.path | ||
|
|
||
| if not local_file_path: | ||
| raise HTTPException( | ||
| status_code=status.HTTP_400_BAD_REQUEST, | ||
| detail=ErrorResponse( | ||
| success=False, | ||
| error="No Image path provided ", | ||
| message="image path is required.", | ||
| ).model_dump(), | ||
| ) | ||
| if not os.path.isfile(local_file_path): | ||
| raise HTTPException( | ||
| status_code=status.HTTP_400_BAD_REQUEST, | ||
| detail=ErrorResponse( | ||
| success=False, | ||
| error="Invalid file path", | ||
| message="The provided path is not a valid file", | ||
| ).model_dump(), | ||
| ) | ||
| image_path = payload.path | ||
|
Comment on lines
+220
to
+241
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🧩 Analysis chainValidate that only the expected field is provided. When Add mutual exclusivity validation at the start of the function: def face_tagging(
payload: FaceSearchRequest,
input_type: Annotated[
InputType, Query(description="Choose input type: 'path' or 'base64'")
] = InputType.path,
):
image_path = None
+
+ # Validate mutual exclusivity
+ if input_type == InputType.path and payload.base64_data is not None:
+ raise HTTPException(
+ status_code=status.HTTP_400_BAD_REQUEST,
+ detail=ErrorResponse(
+ success=False,
+ error="Invalid request",
+ message="Cannot provide base64_data when input_type is 'path'.",
+ ).model_dump(),
+ )
+
+ if input_type == InputType.base64 and payload.path is not None:
+ raise HTTPException(
+ status_code=status.HTTP_400_BAD_REQUEST,
+ detail=ErrorResponse(
+ success=False,
+ error="Invalid request",
+ message="Cannot provide path when input_type is 'base64'.",
+ ).model_dump(),
+ )
if input_type == InputType.path:Previous security concerns remain unaddressed. As noted in earlier reviews, the path input still lacks validation against an allowlist/configured media root, which could allow reading arbitrary filesystem paths. Additionally, line 228 has a trailing space in the error message. 🏁 Script executed: #!/bin/bash
# Examine the face_tagging function and surrounding context
cat backend/app/routes/face_clusters.py | sed -n '200,260p'Length of output: 2047 🏁 Script executed: #!/bin/bash
# Find the FaceSearchRequest model definition
rg -n "class FaceSearchRequest" --type=py -A 10Length of output: 586 🏁 Script executed: #!/bin/bash
# Check for any path validation patterns or allowlist usage in the codebase
rg -n "allowlist|MEDIA_ROOT|secure_path|safe.*path" --type=py -C 2Length of output: 0 🏁 Script executed: #!/bin/bash
# Look for the InputType enum definition
rg -n "class InputType|InputType\s*=" --type=py -A 5Length of output: 299 🏁 Script executed: #!/bin/bash
# Check for any global path validation or filesystem configuration
rg -n "os\.path\.|file.*path|image.*path" --type=py -B 2 -A 2 | head -50Length of output: 3003 🏁 Script executed: #!/bin/bash
# Look for configuration files that might define allowed paths
fd -type f -name "*.env*" -o -name "*config*" -o -name "settings*" | grep -E "\.(py|env|yaml|toml|json)$"Length of output: 170 🏁 Script executed: #!/bin/bash
# Check if there's any validation in middleware or app initialization
rg -n "mount|middleware|startup|app\." --type=py backend/app/ | grep -E "(middleware|mount|startup)" | head -20Length of output: 0 🏁 Script executed: #!/bin/bash
# Verify the actual behavior: does base64_data get processed if both are sent?
cat backend/app/routes/face_clusters.py | sed -n '210,300p' | grep -A 5 "elif input_type"Length of output: 238 Validate that only the expected field is provided. When Add mutual exclusivity validation at the start of the function: def face_tagging(
payload: FaceSearchRequest,
input_type: Annotated[
InputType, Query(description="Choose input type: 'path' or 'base64'")
] = InputType.path,
):
image_path = None
+
+ # Validate mutual exclusivity
+ if input_type == InputType.path and payload.base64_data is not None:
+ raise HTTPException(
+ status_code=status.HTTP_400_BAD_REQUEST,
+ detail=ErrorResponse(
+ success=False,
+ error="Invalid request",
+ message="Cannot provide base64_data when input_type is 'path'.",
+ ).model_dump(),
+ )
+
+ if input_type == InputType.base64 and payload.path is not None:
+ raise HTTPException(
+ status_code=status.HTTP_400_BAD_REQUEST,
+ detail=ErrorResponse(
+ success=False,
+ error="Invalid request",
+ message="Cannot provide path when input_type is 'base64'.",
+ ).model_dump(),
+ )
if input_type == InputType.path:Fix trailing space in error message and address remaining path security concerns. Line 228 contains a trailing space:
|
||
|
|
||
| process_face = result["processed_faces"][0] | ||
| new_embedding = fn.get_embedding(process_face) | ||
|
|
||
| images = get_all_face_embeddings() | ||
| if len(images) == 0: | ||
| return GetAllImagesResponse( | ||
| success=True, | ||
| message=f"Successfully retrieved {len(matches)} images", | ||
| data=[], | ||
| elif input_type == InputType.base64: | ||
| base64_data = payload.base64_data | ||
| if not base64_data: | ||
| raise HTTPException( | ||
| status_code=status.HTTP_400_BAD_REQUEST, | ||
| detail=ErrorResponse( | ||
| success=False, | ||
| error="No base64 data", | ||
| message="Base64 image data is required.", | ||
| ).model_dump(), | ||
| ) | ||
| else: | ||
| for image in images: | ||
| max_similarity = 0 | ||
| similarity = FaceNet_util_cosine_similarity( | ||
| new_embedding, image["embeddings"] | ||
| ) | ||
| max_similarity = max(max_similarity, similarity) | ||
| if max_similarity >= CONFIDENCE_PERCENT: | ||
| matches.append( | ||
| ImageData( | ||
| id=image["id"], | ||
| path=image["path"], | ||
| folder_id=image["folder_id"], | ||
| thumbnailPath=image["thumbnailPath"], | ||
| metadata=image["metadata"], | ||
| isTagged=image["isTagged"], | ||
| tags=image["tags"], | ||
| bboxes=image["bbox"], | ||
| ) | ||
| ) | ||
|
|
||
| return GetAllImagesResponse( | ||
| success=True, | ||
| message=f"Successfully retrieved {len(matches)} images", | ||
| data=matches, | ||
| MAX_B64_LEN = 14_000_000 # 10MB | ||
| if len(base64_data) > MAX_B64_LEN: | ||
| raise HTTPException( | ||
| status_code=status.HTTP_400_BAD_REQUEST, | ||
| detail=ErrorResponse( | ||
| success=False, | ||
| error="Payload too large", | ||
| message="Base64 image exceeds maximum allowed size.", | ||
| ).model_dump(), | ||
| ) | ||
| try: | ||
| image_bytes = base64.b64decode(base64_data.split(",")[-1]) | ||
| except (Base64Error, ValueError): | ||
| raise HTTPException( | ||
| status_code=status.HTTP_400_BAD_REQUEST, | ||
| detail=ErrorResponse( | ||
| success=False, | ||
| error="Invalid base64 data", | ||
| message="The provided base64 image data is malformed or invalid.", | ||
| ).model_dump(), | ||
| ) | ||
coderabbitai[bot] marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| format_match = ( | ||
| base64_data.split(";")[0].split("/")[-1] if ";" in base64_data else "jpeg" | ||
| ) | ||
| extension = ( | ||
| format_match | ||
| if format_match in ["jpeg", "jpg", "png", "gif", "webp"] | ||
| else "jpeg" | ||
| ) | ||
| image_id = str(uuid.uuid4())[:8] | ||
| temp_dir = "temp_uploads" | ||
| os.makedirs(temp_dir, exist_ok=True) | ||
| local_image_path = os.path.join(temp_dir, f"{image_id}.{extension}") | ||
|
|
||
| with open(local_image_path, "wb") as f: | ||
| f.write(image_bytes) | ||
|
|
||
| image_path = local_image_path | ||
|
|
||
| try: | ||
| return perform_face_search(image_path) | ||
| finally: | ||
| fd.close() | ||
| fn.close() | ||
| if input_type == InputType.base64 and image_path and os.path.exists(image_path): | ||
| os.remove(image_path) | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,106 @@ | ||
| import uuid | ||
| from typing import Optional, List, Dict, Any | ||
| from pydantic import BaseModel | ||
| from app.config.settings import CONFIDENCE_PERCENT, DEFAULT_FACENET_MODEL | ||
| from app.database.faces import get_all_face_embeddings | ||
| from app.models.FaceDetector import FaceDetector | ||
| from app.models.FaceNet import FaceNet | ||
| from app.utils.FaceNet import FaceNet_util_cosine_similarity | ||
|
|
||
|
|
||
| class BoundingBox(BaseModel): | ||
| x: float | ||
| y: float | ||
| width: float | ||
| height: float | ||
|
|
||
|
|
||
| class ImageData(BaseModel): | ||
| id: str | ||
| path: str | ||
| folder_id: str | ||
| thumbnailPath: str | ||
| metadata: Dict[str, Any] | ||
| isTagged: bool | ||
| tags: Optional[List[str]] = None | ||
| bboxes: BoundingBox | ||
|
|
||
|
|
||
| class GetAllImagesResponse(BaseModel): | ||
| success: bool | ||
| message: str | ||
| data: List[ImageData] | ||
|
|
||
|
|
||
| def perform_face_search(image_path: str) -> GetAllImagesResponse: | ||
| """ | ||
| Performs face detection, embedding generation, and similarity search. | ||
|
|
||
| Args: | ||
| image_path (str): Path to the image file to process. | ||
|
|
||
| Returns: | ||
| GetAllImagesResponse: Search result containing matched images. | ||
| """ | ||
| fd = FaceDetector() | ||
| fn = FaceNet(DEFAULT_FACENET_MODEL) | ||
|
|
||
| try: | ||
| matches = [] | ||
| image_id = str(uuid.uuid4()) | ||
|
|
||
| try: | ||
| result = fd.detect_faces(image_id, image_path, forSearch=True) | ||
| except Exception as e: | ||
| return GetAllImagesResponse( | ||
| success=False, | ||
| message=f"Failed to process image: {str(e)}", | ||
| data=[], | ||
| ) | ||
| if not result or result["num_faces"] == 0: | ||
| return GetAllImagesResponse( | ||
| success=True, | ||
| message="No faces detected in the image.", | ||
| data=[], | ||
| ) | ||
|
|
||
| process_face = result["processed_faces"][0] | ||
| new_embedding = fn.get_embedding(process_face) | ||
|
|
||
| images = get_all_face_embeddings() | ||
| if not images: | ||
| return GetAllImagesResponse( | ||
| success=True, | ||
| message="No face embeddings available for comparison.", | ||
| data=[], | ||
| ) | ||
|
|
||
| for image in images: | ||
| similarity = FaceNet_util_cosine_similarity( | ||
| new_embedding, image["embeddings"] | ||
| ) | ||
| if similarity >= CONFIDENCE_PERCENT: | ||
| matches.append( | ||
| ImageData( | ||
| id=image["id"], | ||
| path=image["path"], | ||
| folder_id=image["folder_id"], | ||
| thumbnailPath=image["thumbnailPath"], | ||
| metadata=image["metadata"], | ||
| isTagged=image["isTagged"], | ||
| tags=image["tags"], | ||
| bboxes=image["bbox"], | ||
| ) | ||
| ) | ||
|
|
||
| return GetAllImagesResponse( | ||
| success=True, | ||
| message=f"Successfully retrieved {len(matches)} matching images.", | ||
| data=matches, | ||
| ) | ||
|
|
||
| finally: | ||
| if "fd" in locals() and fd is not None: | ||
| fd.close() | ||
| if "fn" in locals() and fn is not None: | ||
| fn.close() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fix typo in error message.
There's an extra space at the end of the error message.
Apply this diff:
🤖 Prompt for AI Agents