diff --git a/backend/app/database/weaviate/__init__.py b/backend/app/database/weaviate/__init__.py index e69de29b..44be87d6 100644 --- a/backend/app/database/weaviate/__init__.py +++ b/backend/app/database/weaviate/__init__.py @@ -0,0 +1,18 @@ +from .operations import ( + store_user_profile, + search_similar_contributors, + search_contributors_by_keywords, + get_contributor_profile, + WeaviateUserOperations +) + +from .client import get_weaviate_client + +__all__ = [ + "store_user_profile", + "search_similar_contributors", + "search_contributors_by_keywords", + "get_contributor_profile", + "WeaviateUserOperations", + "get_weaviate_client" +] diff --git a/backend/app/database/weaviate/operations.py b/backend/app/database/weaviate/operations.py index 600b52c0..e0d9fe17 100644 --- a/backend/app/database/weaviate/operations.py +++ b/backend/app/database/weaviate/operations.py @@ -1,11 +1,12 @@ import logging import json -from typing import Optional, Dict, Any +from typing import Optional, Dict, Any, List from datetime import datetime, timezone from app.models.database.weaviate import WeaviateUserProfile from app.database.weaviate.client import get_weaviate_client import weaviate.exceptions as weaviate_exceptions import weaviate.classes as wvc +from weaviate.classes.query import Filter logger = logging.getLogger(__name__) @@ -26,7 +27,7 @@ async def find_user_by_id(self, user_id: str) -> Optional[str]: collection = client.collections.get(self.collection_name) response = await collection.query.fetch_objects( - where=wvc.query.Filter.by_property("user_id").equal(user_id), + filters=Filter.by_property("user_id").equal(user_id), limit=1 ) @@ -43,7 +44,7 @@ async def find_user_by_id(self, user_id: str) -> Optional[str]: logger.error(f"Unexpected error finding user by ID: {str(e)}") return None - async def create_user_profile(self, profile: WeaviateUserProfile) -> bool: + async def create_user_profile(self, profile: WeaviateUserProfile, embedding_vector: List[float]) -> bool: """ Create a new user profile in Weaviate. """ @@ -54,7 +55,8 @@ async def create_user_profile(self, profile: WeaviateUserProfile) -> bool: collection = client.collections.get(self.collection_name) result = await collection.data.insert( - properties=profile_dict + properties=profile_dict, + vector=embedding_vector ) logger.info(f"Created user profile for {profile.github_username} with UUID: {result}") @@ -67,7 +69,7 @@ async def create_user_profile(self, profile: WeaviateUserProfile) -> bool: logger.error(f"Unexpected error creating user profile: {str(e)}") return False - async def update_user_profile(self, uuid: str, profile: WeaviateUserProfile) -> bool: + async def update_user_profile(self, uuid: str, profile: WeaviateUserProfile, embedding_vector: List[float]) -> bool: """ Update an existing user profile in Weaviate. """ @@ -78,7 +80,8 @@ async def update_user_profile(self, uuid: str, profile: WeaviateUserProfile) -> collection = client.collections.get(self.collection_name) await collection.data.update( uuid=uuid, - properties=profile_dict + properties=profile_dict, + vector=embedding_vector ) logger.info(f"Updated user profile for {profile.github_username} with UUID: {uuid}") @@ -91,7 +94,7 @@ async def update_user_profile(self, uuid: str, profile: WeaviateUserProfile) -> logger.error(f"Unexpected error updating user profile: {str(e)}") return False - async def upsert_user_profile(self, profile: WeaviateUserProfile) -> bool: + async def upsert_user_profile(self, profile: WeaviateUserProfile, embedding_vector: List[float]) -> bool: """ Create or update a user profile (upsert operation). """ @@ -100,15 +103,162 @@ async def upsert_user_profile(self, profile: WeaviateUserProfile) -> bool: if existing_uuid: logger.info(f"Updating existing profile for user_id: {profile.user_id}") - return await self.update_user_profile(existing_uuid, profile) + return await self.update_user_profile(existing_uuid, profile, embedding_vector) else: logger.info(f"Creating new profile for user_id: {profile.user_id}") - return await self.create_user_profile(profile) + return await self.create_user_profile(profile, embedding_vector) except Exception as e: logger.error(f"Error in upsert operation: {str(e)}") return False + async def search_similar_contributors(self, query_embedding: List[float], limit: int = 10, min_distance: float = 0.7) -> List[Dict[str, Any]]: + """Search for similar contributors using vector similarity search.""" + try: + logger.info(f"Searching for similar contributors with embedding dimension: {len(query_embedding)}") + + async with get_weaviate_client() as client: + collection = client.collections.get(self.collection_name) + + response = await collection.query.near_vector( + near_vector=query_embedding, + limit=limit, + distance=min_distance, + return_metadata=wvc.query.MetadataQuery(distance=True) + ) + + results = [] + for obj in response.objects: + try: + properties = obj.properties + distance = obj.metadata.distance if obj.metadata and obj.metadata.distance else 1.0 + similarity_score = 1.0 - distance + + result = { + "user_id": properties.get("user_id"), + "github_username": properties.get("github_username"), + "display_name": properties.get("display_name"), + "bio": properties.get("bio"), + "languages": properties.get("languages", []), + "topics": properties.get("topics", []), + "followers_count": properties.get("followers_count", 0), + "total_stars_received": properties.get("total_stars_received", 0), + "similarity_score": similarity_score, + "distance": distance, + "profile_summary": properties.get("profile_text_for_embedding", "") + } + results.append(result) + + except Exception as e: + logger.warning(f"Error processing search result: {str(e)}") + continue + + logger.info(f"Found {len(results)} similar contributors") + return results + + except weaviate_exceptions.WeaviateBaseError as e: + logger.error(f"Weaviate error in similarity search: {str(e)}") + return [] + except Exception as e: + logger.error(f"Unexpected error in similarity search: {str(e)}") + return [] + + async def search_contributors_by_keywords(self, keywords: List[str], limit: int = 10) -> List[Dict[str, Any]]: + """Search for contributors using keyword matching on profile text, languages, and topics.""" + try: + logger.info(f"Searching for contributors with keywords: {keywords}") + + async with get_weaviate_client() as client: + collection = client.collections.get(self.collection_name) + + keyword_query = " ".join(keywords) + + response = await collection.query.bm25( + query=keyword_query, + limit=limit, + return_metadata=wvc.query.MetadataQuery(score=True) + ) + + results = [] + for obj in response.objects: + try: + properties = obj.properties + score = obj.metadata.score if obj.metadata and obj.metadata.score else 0.0 + + result = { + "user_id": properties.get("user_id"), + "github_username": properties.get("github_username"), + "display_name": properties.get("display_name"), + "bio": properties.get("bio"), + "languages": properties.get("languages", []), + "topics": properties.get("topics", []), + "followers_count": properties.get("followers_count", 0), + "total_stars_received": properties.get("total_stars_received", 0), + "search_score": score, + "profile_summary": properties.get("profile_text_for_embedding", "") + } + results.append(result) + + except Exception as e: + logger.warning(f"Error processing keyword search result: {str(e)}") + continue + + logger.info(f"Found {len(results)} contributors matching keywords") + return results + + except weaviate_exceptions.WeaviateBaseError as e: + logger.error(f"Weaviate error in keyword search: {str(e)}") + return [] + except Exception as e: + logger.error(f"Unexpected error in keyword search: {str(e)}") + return [] + + # TODO: Add hybrid search for contributors. Default in built hybrid search doesn't support custom vectors. + + async def get_contributor_profile(self, github_username: str) -> Optional[WeaviateUserProfile]: + """Get a specific contributor's profile by GitHub username.""" + try: + async with get_weaviate_client() as client: + collection = client.collections.get(self.collection_name) + + response = await collection.query.fetch_objects( + filters=Filter.by_property("github_username").equal(github_username), + limit=1 + ) + + if response.objects: + properties = response.objects[0].properties + + repositories = json.loads(properties.get("repositories", "[]")) + pull_requests = json.loads(properties.get("pull_requests", "[]")) + + return WeaviateUserProfile( + user_id=properties.get("user_id"), + github_username=properties.get("github_username"), + display_name=properties.get("display_name"), + bio=properties.get("bio"), + location=properties.get("location"), + languages=properties.get("languages", []), + topics=properties.get("topics", []), + followers_count=properties.get("followers_count", 0), + following_count=properties.get("following_count", 0), + total_stars_received=properties.get("total_stars_received", 0), + total_forks=properties.get("total_forks", 0), + repositories=repositories, + pull_requests=pull_requests, + profile_text_for_embedding=properties.get("profile_text_for_embedding", ""), + last_updated=properties.get("last_updated") + ) + + return None + + except weaviate_exceptions.WeaviateBaseError as e: + logger.error(f"Weaviate error getting contributor profile: {str(e)}") + return None + except Exception as e: + logger.error(f"Unexpected error getting contributor profile: {str(e)}") + return None + def _prepare_profile_data(self, profile: WeaviateUserProfile) -> Dict[str, Any]: """ Prepare profile data for Weaviate storage. @@ -128,9 +278,28 @@ def _prepare_profile_data(self, profile: WeaviateUserProfile) -> Dict[str, Any]: return profile_dict -async def store_user_profile(profile: WeaviateUserProfile) -> bool: +async def store_user_profile(profile: WeaviateUserProfile, embedding_vector: List[float]) -> bool: """ Convenience function to store or update a user profile. """ operations = WeaviateUserOperations() - return await operations.upsert_user_profile(profile) + return await operations.upsert_user_profile(profile, embedding_vector) + +async def search_similar_contributors(query_embedding: List[float], limit: int = 10, min_distance: float = 0.7) -> List[Dict[str, Any]]: + """ + Convenience function to search for similar contributors using vector similarity. + """ + operations = WeaviateUserOperations() + return await operations.search_similar_contributors(query_embedding, limit, min_distance) + +async def search_contributors_by_keywords(keywords: List[str], limit: int = 10) -> List[Dict[str, Any]]: + """ + Convenience function to search for contributors using keyword matching. + """ + operations = WeaviateUserOperations() + return await operations.search_contributors_by_keywords(keywords, limit) + +async def get_contributor_profile(github_username: str) -> Optional[WeaviateUserProfile]: + """Convenience function to get a contributor's profile by GitHub username.""" + operations = WeaviateUserOperations() + return await operations.get_contributor_profile(github_username) diff --git a/backend/app/database/weaviate/scripts/create_schemas.py b/backend/app/database/weaviate/scripts/create_schemas.py index 351c47a8..53623499 100644 --- a/backend/app/database/weaviate/scripts/create_schemas.py +++ b/backend/app/database/weaviate/scripts/create_schemas.py @@ -6,6 +6,7 @@ async def create_schema(client, name, properties): await client.collections.create( name=name, properties=properties, + vectorizer_config=wc.Configure.Vectorizer.none() ) print(f"Created: {name}") diff --git a/backend/app/services/embedding_service/profile_summarization/prompts/summarization_prompt.py b/backend/app/services/embedding_service/profile_summarization/prompts/summarization_prompt.py new file mode 100644 index 00000000..e12d17c0 --- /dev/null +++ b/backend/app/services/embedding_service/profile_summarization/prompts/summarization_prompt.py @@ -0,0 +1,24 @@ +PROFILE_SUMMARIZATION_PROMPT = """You are a GitHub profile summarizer for a developer contributor recommendation system. Your task is to create a concise, keyword-rich summary optimized for semantic search and contributor matching. The summary should highlight the developer's technical expertise, recent contributions, and key projects to enable accurate and relevant recommendations. + +PROFILE DATA: +- GitHub Username: {github_username} +- Bio: {bio} +- Languages: {languages} +- Recent Pull Requests: {pull_requests} +- Topics/Skills: {topics} +- Stats: {stats} + +INSTRUCTIONS: +- Length: Maximum 150-200 words (at maximum 500 tokens). +- Lead with Top Skills: Start with the developer's most prominent technical skills and programming languages (e.g., Python, JavaScript, ML, AI). +- Focus on Recent Expertise: Emphasize areas of active, recent involvement, especially from pull requests and recent work. +- Include Key Projects/Organizations: Mention the most relevant projects or organizations the developer has contributed to. +- Use Specific Technology Names: Incorporate precise terms like frameworks, tools, and methodologies (e.g., React, TensorFlow, DevOps). +- Prioritize Pull Request Skills: Highlight skills and technologies mentioned in recent pull requests, as they reflect current expertise. +- Style: Write in a technical, keyword-dense style. Use action verbs and quantifiable achievements where possible (e.g., "Led development of...," "Optimized performance by..."). +- Tone: Professional and focused. Avoid filler content; every word should support search relevance. +- Format: Plain text, no formatting elements (e.g., bullet points, bold text). + +GOAL: Create a summary that is easily parsed by search algorithms, rich in relevant keywords, and clearly showcases the developer's technical strengths and recent contributions. + +Create a focused, search-optimized profile summary in plain text format:""" diff --git a/backend/app/services/embedding_service/service.py b/backend/app/services/embedding_service/service.py index ba39f78c..ea8948d4 100644 --- a/backend/app/services/embedding_service/service.py +++ b/backend/app/services/embedding_service/service.py @@ -2,9 +2,14 @@ import os from typing import List, Dict, Any, Optional import torch -from pydantic import BaseModel, Field +from pydantic import BaseModel from dotenv import load_dotenv from sentence_transformers import SentenceTransformer +from langchain_google_genai import ChatGoogleGenerativeAI +from langchain_core.messages import HumanMessage +from app.core.config import settings +from app.models.database.weaviate import WeaviateUserProfile +from app.services.embedding_service.profile_summarization.prompts.summarization_prompt import PROFILE_SUMMARIZATION_PROMPT load_dotenv() @@ -15,51 +20,70 @@ logger = logging.getLogger(__name__) -class EmbeddingItem(BaseModel): - id: str - collection: str - content: str - metadata: Dict[str, Any] = Field(default_factory=dict) +class ProfileSummaryResult(BaseModel): + """Result of profile summarization""" + summary_text: str + token_count_estimate: int embedding: Optional[List[float]] = None class EmbeddingService: - """Service for generating embeddings for text using Hugging Face models""" + """Service for generating embeddings and profile summarization for Weaviate integration""" def __init__(self, model_name: str = MODEL_NAME, device: str = EMBEDDING_DEVICE): - """Initialize the embedding service with specified model""" + """Initialize the embedding service with specified model and LLM""" self.model_name = model_name self.device = device self._model = None + self._llm = None logger.info(f"Initializing EmbeddingService with model: {model_name} on device: {device}") @property def model(self) -> SentenceTransformer: - """Lazy-load model to avoid loading during import""" + """Lazy-load embedding model to avoid loading during import""" if self._model is None: try: - logger.info(f"Loading model: {self.model_name}") + logger.info(f"Loading embedding model: {self.model_name}") self._model = SentenceTransformer(self.model_name, device=self.device) + logger.info( + f"Model loaded successfully. Embedding dimension: {self._model.get_sentence_embedding_dimension()}") except Exception as e: logger.error(f"Error loading model {self.model_name}: {str(e)}") raise return self._model + @property + def llm(self) -> ChatGoogleGenerativeAI: + """Lazy-load LLM for profile summarization""" + if self._llm is None: + try: + self._llm = ChatGoogleGenerativeAI( + model=settings.github_agent_model, + temperature=0.3, + google_api_key=settings.gemini_api_key + ) + logger.info("LLM initialized for profile summarization") + except Exception as e: + logger.error(f"Error initializing LLM: {str(e)}") + raise + return self._llm + async def get_embedding(self, text: str) -> List[float]: """Generate embedding for a single text input""" try: # Convert to list for consistency if isinstance(text, str): text = [text] - + # Generate embeddings embeddings = self.model.encode( - text, + text, convert_to_tensor=True, show_progress_bar=False ) - + # Convert to standard Python list and return embedding_list = embeddings[0].cpu().tolist() + logger.debug(f"Generated embedding with dimension: {len(embedding_list)}") return embedding_list except Exception as e: logger.error(f"Error generating embedding: {str(e)}") @@ -70,47 +94,113 @@ async def get_embeddings(self, texts: List[str]) -> List[List[float]]: try: # Generate embeddings embeddings = self.model.encode( - texts, + texts, convert_to_tensor=True, batch_size=MAX_BATCH_SIZE, show_progress_bar=len(texts) > 10 ) - + # Convert to standard Python list embedding_list = embeddings.cpu().tolist() + logger.info(f"Generated {len(embedding_list)} embeddings") return embedding_list except Exception as e: logger.error(f"Error generating batch embeddings: {str(e)}") raise - # Process_item and process_items are used to add embeddings to items - # Can be migrated to a separate class if needed aligning both VectorDB service and Embedding service - async def process_item(self, item: EmbeddingItem) -> EmbeddingItem: - """Process a single item to add embedding""" - if not item.embedding: - item.embedding = await self.get_embedding(item.content) - return item - - async def process_items(self, items: List[EmbeddingItem]) -> List[EmbeddingItem]: - """Process multiple items to add embeddings efficiently""" - # Extract content from items that need embeddings - texts_to_embed = [] - items_to_embed = [] - - for item in items: - if not item.embedding: - texts_to_embed.append(item.content) - items_to_embed.append(item) - - if texts_to_embed: - # Generate embeddings in batch - embeddings = await self.get_embeddings(texts_to_embed) - - # Update items with their embeddings - for i, item in enumerate(items_to_embed): - item.embedding = embeddings[i] - - return items + async def summarize_user_profile(self, profile: WeaviateUserProfile) -> ProfileSummaryResult: + """Generate a comprehensive summary of a user profile optimized for embedding and semantic search.""" + try: + logger.info(f"Summarizing profile for user: {profile.github_username}") + + bio = profile.bio or "No bio provided" + languages = ", ".join(profile.languages) if profile.languages else "No languages specified" + topics = ", ".join(profile.topics) if profile.topics else "No topics specified" + + prs_info = [] + for pr in profile.pull_requests: + pr_desc = pr.body if pr.body else "No description" + prs_info.append(f"{pr.title} in {pr.repository}: {pr_desc}") + pull_requests_text = " | ".join(prs_info) if prs_info else "No recent pull requests" + + stats_text = f"Followers: {profile.followers_count}, Following: {profile.following_count}, Total Stars: {profile.total_stars_received}, Total Forks: {profile.total_forks}" + + prompt = PROFILE_SUMMARIZATION_PROMPT.format( + github_username=profile.github_username, + bio=bio, + languages=languages, + pull_requests=pull_requests_text, + topics=topics, + stats=stats_text + ) + + logger.info(f"Sending profile summarization request to LLM for {profile.github_username}") + response = await self.llm.ainvoke([HumanMessage(content=prompt)]) + summary_text = response.content.strip() + + # Estimate token count (rough approximation: 1 token ≈ 4 characters) + token_estimate = len(summary_text) // 4 + logger.info( + f"Generated profile summary for {profile.github_username}: {len(summary_text)} chars (~{token_estimate} tokens)" + ) + + embedding = await self.get_embedding(summary_text) + + return ProfileSummaryResult( + summary_text=summary_text, + token_count_estimate=token_estimate, + embedding=embedding + ) + + except Exception as e: + logger.error(f"Error summarizing profile for {profile.github_username}: {str(e)}") + raise + + async def process_user_profile(self, profile: WeaviateUserProfile) -> tuple[WeaviateUserProfile, List[float]]: + """Process a user profile by generating summary and embedding, then updating the profile object.""" + try: + logger.info(f"Processing user profile for Weaviate storage: {profile.github_username}") + + summary_result = await self.summarize_user_profile(profile) + + profile.profile_text_for_embedding = summary_result.summary_text + + logger.info( + f"Successfully processed profile for {profile.github_username}: summary generated with {summary_result.token_count_estimate} estimated tokens" + ) + + return profile, summary_result.embedding + + except Exception as e: + logger.error(f"Error processing user profile for Weaviate: {str(e)}") + raise + + async def search_similar_profiles(self, query_text: str, limit: int = 10) -> List[Dict[str, Any]]: + """ + Search for similar profiles using embedding similarity. + This method generates an embedding for the query and searches for similar contributors. + """ + try: + logger.info(f"Searching for similar profiles with query: {query_text[:100]}") + + query_embedding = await self.get_embedding(query_text) + + logger.info(f"Generated query embedding with dimension: {len(query_embedding)}") + + from app.database.weaviate.operations import search_similar_contributors + + results = await search_similar_contributors( + query_embedding=query_embedding, + limit=limit, + min_distance=0.5 + ) + + logger.info(f"Found {len(results)} similar contributors for query") + return results + + except Exception as e: + logger.error(f"Error searching similar profiles: {str(e)}") + raise def get_model_info(self) -> Dict[str, Any]: """Get information about the model being used""" @@ -125,8 +215,12 @@ def clear_cache(self): if self._model: del self._model self._model = None - # Force garbage collection - import gc - gc.collect() - if torch.cuda.is_available(): - torch.cuda.empty_cache() + if self._llm: + del self._llm + self._llm = None + # Force garbage collection + import gc + gc.collect() + if torch.cuda.is_available(): + torch.cuda.empty_cache() + logger.info("Cleared embedding service cache") diff --git a/backend/app/services/user/profiling.py b/backend/app/services/user/profiling.py index 223f3abd..0b66b5a9 100644 --- a/backend/app/services/user/profiling.py +++ b/backend/app/services/user/profiling.py @@ -6,6 +6,7 @@ from collections import Counter from app.models.database.weaviate import WeaviateUserProfile, WeaviateRepository, WeaviatePullRequest from app.database.weaviate.operations import store_user_profile +from app.services.embedding_service.service import EmbeddingService from app.core.config import settings logger = logging.getLogger(__name__) @@ -281,7 +282,7 @@ async def build_user_profile(self, user_id: str, github_username: str) -> Option following_count=user_data.get("following", 0), total_stars_received=total_stars, total_forks=total_forks, - profile_text_for_embedding="", # TODO: Invoke agent/llm to generate this + profile_text_for_embedding="", # Will be filled by embedding service last_updated=datetime.now() ) @@ -294,17 +295,35 @@ async def build_user_profile(self, user_id: str, github_username: str) -> Option async def profile_user_from_github(user_id: str, github_username: str) -> bool: - """Profile a user and store in Weaviate with proper resource management.""" + """Profile a user, generate embeddings, and store in Weaviate.""" async with GitHubUserProfiler() as profiler: try: profile = await profiler.build_user_profile(user_id, github_username) - if profile: - success = await store_user_profile(profile) + if not profile: + logger.error(f"Failed to build profile for {github_username}") + return False + + logger.info(f"Processing profile for embedding: {github_username}") + embedding_service = EmbeddingService() + + try: + processed_profile, embedding_vector = await embedding_service.process_user_profile(profile) + logger.info(f"Successfully generated profile summary for {github_username}") + + success = await store_user_profile(processed_profile, embedding_vector) if success: - logger.info(f"Successfully stored profile for user {github_username}") + logger.info(f"Successfully stored profile for user {github_username} with embeddings") + else: + logger.error(f"Failed to store profile for user {github_username} in Weaviate") return success - return False + + except Exception as e: + logger.error(f"Error processing profile with embedding service for {github_username}: {str(e)}") + return False + finally: + embedding_service.clear_cache() + except Exception as e: logger.error(f"Failed to profile user {github_username}: {str(e)}") return False diff --git a/backend/app/services/vector_db/__init__.py b/backend/app/services/vector_db/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/backend/app/services/vector_db/service.py b/backend/app/services/vector_db/service.py deleted file mode 100644 index 96acf213..00000000 --- a/backend/app/services/vector_db/service.py +++ /dev/null @@ -1,200 +0,0 @@ -import logging - -import os -from typing import List, Dict, Any, Optional -from uuid import UUID -from pydantic import BaseModel, Field -from dotenv import load_dotenv -from supabase import create_client, Client - -load_dotenv() - -supabase_url = os.getenv("SUPABASE_URL") -supabase_key = os.getenv("SUPABASE_SERVICE_ROLE_KEY") - -logger = logging.getLogger(__name__) - -class EmbeddingItem(BaseModel): - id: str - - collection: str - content: str - metadata: Dict[str, Any] = Field(default_factory=dict) - embedding: Optional[List[float]] = None - - -class VectorDBService: - """Service for interacting with Supabase Vector DB via RPC""" - - def __init__(self, table_name: str = "embeddings"): - if not supabase_url or not supabase_key: - logger.warning("Supabase credentials not provided. Operations will fail.") - - self.client: Client = create_client(supabase_url, supabase_key) - self.table_name = table_name - - def _convert_embedding_to_pg_vector(self, embedding: List[float]) -> str: - """Convert a list of floats to PostgreSQL vector string format.""" - if not embedding: - return "[]" - return '[' + ','.join(map(str, embedding)) + ']' - - def _parse_pg_vector(self, vector_str: str) -> List[float]: - """Parse PostgreSQL vector string to list of floats.""" - if not vector_str: - return None - - vector_str = vector_str.strip('[]()') - return [float(x) for x in vector_str.split(',')] - - async def create_table(self) -> bool: - """Create the embeddings table using an RPC function.""" - try: - response = self.client.rpc("create_embeddings_table").execute() - return response.data is not None - except Exception as e: - logger.error(f"Error creating table: {str(e)}") - return False - - async def add_item(self, item: EmbeddingItem) -> bool: - """Insert a single item into the vector database using RPC.""" - try: - embedding_str = self._convert_embedding_to_pg_vector(item.embedding) - response = self.client.rpc( - "add_embedding", - { - "p_id": item.id, - "p_collection": item.collection, - "p_content": item.content, - "p_metadata": item.metadata, - "p_embedding": embedding_str, - }, - ).execute() - return response.data is not None - except Exception as e: - logger.error(f"Error adding item: {str(e)}") - return False - - async def add_items(self, items: List[EmbeddingItem]) -> bool: - """Insert multiple items using a single RPC call.""" - try: - data = [ - { - "p_id": str(item.id), - "p_collection": item.collection, - "p_content": item.content, - "p_metadata": item.metadata, - "p_embedding": self._convert_embedding_to_pg_vector(item.embedding), - } - for item in items - ] - - response = self.client.rpc("add_multiple_embeddings", {"data": data}).execute() - return response.data is not None - except Exception as e: - logger.error(f"Error adding multiple items: {str(e)}") - return False - - async def search( - self, query_embedding: List[float], collection: str, limit: int = 5, threshold: float = 0.5 - ) -> List[Dict[str, Any]]: - """Search for similar embeddings using Supabase RPC.""" - try: - query_embedding_str = self._convert_embedding_to_pg_vector(query_embedding) - response = self.client.rpc( - "search_embeddings", - { - "p_query_embedding": query_embedding_str, - "p_collection": collection, - "p_limit": limit, - "p_threshold": threshold, - }, - ).execute() - # Parse embeddings from strings to lists - results = response.data or [] - for result in results: - result['embedding'] = self._parse_pg_vector(result['embedding']) - return results - except Exception as e: - logger.error(f"Error searching embeddings: {str(e)}") - return [] - - async def get_item(self, item_id: str, collection: str) -> Optional[EmbeddingItem]: - """Retrieve an item by ID and collection via RPC.""" - try: - response = self.client.rpc( - "get_embedding", - { - "p_id": item_id, - "p_collection": collection - }, - ).execute() - - if response.data: - data = response.data[0] - # Parse the embedding string to a list - embedding = self._parse_pg_vector(data.get('embedding')) - return EmbeddingItem( - id=data["id"], - collection=data["collection"], - content=data["content"], - metadata=data["metadata"], - embedding=self._parse_pg_vector(data.get('embedding')), - ) - return None - except Exception as e: - logger.error(f"Error getting item: {str(e)}") - return None - - async def update_item(self, item: EmbeddingItem) -> bool: - """Update an item via RPC.""" - try: - embedding_str = self._convert_embedding_to_pg_vector(item.embedding) - response = self.client.rpc( - "update_embedding", - { - "p_id": str(item.id), - "p_collection": item.collection, - "p_content": item.content, - "p_metadata": item.metadata, - "p_embedding": embedding_str, - }, - ).execute() - return response.data is not None - except Exception as e: - logger.error(f"Error updating item: {str(e)}") - return False - - async def delete_item(self, item_id: str, collection: str) -> bool: - """Delete an item via RPC.""" - try: - response = self.client.rpc( - "delete_embedding", - { - "p_id": item_id, - "p_collection": collection - } - ).execute() - return response.data is not None - except Exception as e: - logger.error(f"Error deleting item: {str(e)}") - return False - - async def list_collections(self) -> List[str]: - """List all unique collections via RPC.""" - try: - response = self.client.rpc("list_collections").execute() - return [row["collection"] for row in response.data] if response.data else [] - except Exception as e: - logger.error(f"Error listing collections: {str(e)}") - return [] - - async def check_connection(self) -> bool: - """Check Supabase connection via an RPC function.""" - try: - response = self.client.rpc("check_embeddings_connection").execute() - return response.data is not None - except Exception as e: - logger.error(f"Connection check failed: {str(e)}") - return False -