Skip to content

Enhance VectorStore abstract base class with pydantic integration and common utilities #212

@manavgup

Description

@manavgup

📋 Overview

Modernize the vectordbs/vector_store.py abstract base class to integrate with enhanced pydantic models from #211, provide common utilities, and reduce boilerplate code across all vector database implementations.

🎯 Goals

  • Integrate seamlessly with enhanced pydantic models from Enhance data_types.py with vector database optimized pydantic models #211
  • Reduce boilerplate code in vector DB implementations
  • Standardize error handling and response patterns
  • Provide common utilities all vector DBs can leverage
  • Improve connection management and health monitoring
  • Maintain backward compatibility during transition

🔧 Technical Specifications

Enhanced Abstract Base Class

1. Core Abstract Methods (Updated Signatures)

class VectorStore(ABC):
    \"\"\"Enhanced abstract base with pydantic integration\"\"\"
    
    def __init__(self, settings: Settings):
        self.settings = settings
        self._client = None
        self._connection_pool = None
        self.vector_db_type: str = \"unknown\"  # Override in subclasses
        
    # Core abstract methods with pydantic models
    @abstractmethod
    async def _create_collection_impl(self, collection: CollectionConfig) -> None:
        \"\"\"Implementation-specific collection creation\"\"\"
        pass
    
    @abstractmethod 
    async def _add_documents_impl(self, collection_name: str, chunks: List[EmbeddedChunk]) -> List[str]:
        \"\"\"Implementation-specific document addition\"\"\"
        pass
    
    @abstractmethod
    async def _search_impl(self, request: VectorSearchRequest) -> List[QueryResult]:
        \"\"\"Implementation-specific search\"\"\"
        pass
    
    @abstractmethod
    async def _delete_collection_impl(self, collection_name: str) -> None:
        \"\"\"Implementation-specific collection deletion\"\"\"
        pass
    
    @abstractmethod
    async def _health_check_impl(self) -> Dict[str, Any]:
        \"\"\"Implementation-specific health check\"\"\"
        pass

2. Standardized Public Methods

    # Public methods using pydantic models and error handling
    async def create_collection(self, collection_config: CollectionConfig) -> CollectionResponse:
        \"\"\"Standardized collection creation with validation\"\"\"
        try:
            # Validate configuration for this vector DB type
            collection_config.validate_for_vector_db(self.vector_db_type)
            
            # Check if collection already exists
            if await self._collection_exists(collection_config.name):
                return CollectionResponse.error(f\"Collection '{collection_config.name}' already exists\")
            
            await self._create_collection_impl(collection_config)
            return CollectionResponse.success(collection_config.name)
        except Exception as e:
            logger.error(f\"Failed to create collection: {e}\")
            return CollectionResponse.error(str(e))
    
    async def add_documents(self, request: DocumentIngestionRequest) -> DocumentIngestionResponse:
        \"\"\"Standardized document addition with batch processing\"\"\"
        try:
            # Validate collection exists
            if not await self._collection_exists(request.collection_name):
                return DocumentIngestionResponse.error(f\"Collection '{request.collection_name}' does not exist\")
            
            # Get embedded chunks and validate
            embedded_chunks = request.get_embedded_chunks()
            if not embedded_chunks:
                return DocumentIngestionResponse.error(\"No embedded chunks found in request\")
            
            # Process in batches for efficiency
            all_ids = []
            for batch in self._batch_chunks(embedded_chunks, request.batch_size):
                batch_ids = await self._add_documents_impl(request.collection_name, batch)
                all_ids.extend(batch_ids)
                
            return DocumentIngestionResponse.success(all_ids)
        except Exception as e:
            logger.error(f\"Failed to add documents: {e}\")
            return DocumentIngestionResponse.error(str(e))
    
    async def search(self, request: VectorSearchRequest) -> SearchResponse:
        \"\"\"Standardized search with validation\"\"\"
        try:
            # Validate collection exists
            if not await self._collection_exists(request.collection_name):
                return SearchResponse.error(f\"Collection '{request.collection_name}' does not exist\")
            
            # Convert text query to embeddings if needed
            if isinstance(request.query, str):
                request = await self._convert_text_to_embedding_query(request)
            
            results = await self._search_impl(request)
            return SearchResponse.success(results)
        except Exception as e:
            logger.error(f\"Search failed: {e}\")
            return SearchResponse.error(str(e))

3. Common Utility Methods

    # Utility methods all implementations can use
    def _batch_chunks(self, chunks: List[EmbeddedChunk], batch_size: int) -> Iterator[List[EmbeddedChunk]]:
        \"\"\"Standard batching logic for efficient processing\"\"\"
        for i in range(0, len(chunks), batch_size):
            yield chunks[i:i + batch_size]
    
    async def _collection_exists(self, collection_name: str) -> bool:
        \"\"\"Check if collection exists (implemented per vector DB)\"\"\"
        try:
            stats = await self.get_collection_stats(collection_name)
            return stats.success
        except:
            return False
    
    async def _convert_text_to_embedding_query(self, request: VectorSearchRequest) -> VectorSearchRequest:
        \"\"\"Convert text query to embedding query using configured embedding service\"\"\"
        from vectordbs.utils.watsonx import get_embeddings
        
        embeddings = get_embeddings(request.query, settings=self.settings)
        if not embeddings:
            raise ValueError(f\"Failed to generate embeddings for query: {request.query}\")
        
        # Create new request with embeddings
        return VectorSearchRequest(
            collection_name=request.collection_name,
            query=embeddings[0],
            filters=request.filters,
            limit=request.limit,
            include_metadata=request.include_metadata
        )
    
    async def health_check(self) -> HealthCheckResponse:
        \"\"\"Standard health check with connection validation\"\"\"
        try:
            health_data = await self._health_check_impl()
            health_data.update({
                \"vector_db_type\": self.vector_db_type,
                \"timestamp\": datetime.now().isoformat(),
                \"status\": \"healthy\"
            })
            return HealthCheckResponse.success(health_data)
        except Exception as e:
            error_data = {
                \"vector_db_type\": self.vector_db_type,
                \"timestamp\": datetime.now().isoformat(),
                \"status\": \"unhealthy\",
                \"error\": str(e)
            }
            return HealthCheckResponse.error(str(e), metadata=error_data)
    
    async def get_collection_stats(self, collection_name: str) -> VectorDBResponse[Dict[str, Any]]:
        \"\"\"Get collection statistics\"\"\"
        try:
            stats = await self._get_collection_stats_impl(collection_name)
            return VectorDBResponse.success(stats)
        except Exception as e:
            return VectorDBResponse.error(str(e))

4. Connection Management

    async def connect(self) -> None:
        \"\"\"Initialize connection to vector database\"\"\"
        if self._client is None:
            self._client = await self._create_client()
    
    async def disconnect(self) -> None:
        \"\"\"Clean up connections\"\"\"
        if self._client:
            await self._cleanup_client()
            self._client = None
    
    async def __aenter__(self):
        \"\"\"Async context manager support\"\"\"
        await self.connect()
        return self
    
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        \"\"\"Async context manager cleanup\"\"\"
        await self.disconnect()

Backward Compatibility Layer

Legacy Method Support

    # Backward compatibility methods (deprecated but functional)
    def create_collection(self, collection_name: str, metadata: dict = None) -> None:
        \"\"\"Legacy method - use create_collection(CollectionConfig) instead\"\"\"
        warnings.warn(\"create_collection(str) is deprecated, use CollectionConfig\", DeprecationWarning)
        config = CollectionConfig(name=collection_name, dimension=self.settings.embedding_dim)
        return asyncio.run(self.create_collection(config))
    
    def add_documents(self, collection_name: str, documents: List[Document]) -> List[str]:
        \"\"\"Legacy method - use add_documents(DocumentIngestionRequest) instead\"\"\"
        warnings.warn(\"add_documents(str, List[Document]) is deprecated\", DeprecationWarning)
        request = DocumentIngestionRequest(collection_name=collection_name, documents=documents)
        response = asyncio.run(self.add_documents(request))
        if response.success:
            return response.data
        else:
            raise Exception(response.error)

✅ Acceptance Criteria

Functional Requirements

  • All abstract methods use enhanced pydantic models from Enhance data_types.py with vector database optimized pydantic models #211
  • Common utilities reduce code duplication across implementations
  • Standardized error handling with structured responses
  • Health check and statistics methods work for all vector DBs
  • Batch processing handles large document sets efficiently
  • Connection management supports async patterns

Technical Requirements

  • Maintain backward compatibility with existing implementations
  • Use async/await patterns throughout
  • Comprehensive logging for debugging
  • Type hints for all methods
  • Generic response handling with proper error states
  • Support for dependency injection with Settings

Performance Requirements

  • Batch processing reduces API calls by 90%+
  • Health checks complete within 5 seconds
  • Connection pooling reduces latency by 50%+
  • Memory usage scales linearly with batch size

Testing Requirements

  • Unit tests for all common utility methods
  • Integration tests with mock vector DB implementations
  • Performance tests for batch processing
  • Backward compatibility tests
  • Error handling and edge case tests

🔄 Implementation Details

File Changes

  • vectordbs/vector_store.py - Primary implementation
  • tests/unit/test_vector_store_base.py - Unit tests
  • tests/integration/test_vector_store_integration.py - Integration tests
  • Documentation updates for new patterns

Dependencies

Migration Strategy

  • Implement new methods alongside existing ones
  • Add deprecation warnings for old patterns
  • Provide migration documentation
  • Gradual adoption in vector DB implementations

🧪 Testing Strategy

Unit Tests

def test_batch_processing():
    \"\"\"Test _batch_chunks utility method\"\"\"
    
def test_error_handling():
    \"\"\"Test standardized error response patterns\"\"\"
    
def test_collection_validation():
    \"\"\"Test collection existence checks\"\"\"
    
def test_backward_compatibility():
    \"\"\"Test legacy method support\"\"\"

Integration Tests

def test_pydantic_integration():
    \"\"\"Test integration with enhanced pydantic models\"\"\"
    
def test_health_check_implementation():
    \"\"\"Test health check across different vector DBs\"\"\"

📊 Success Metrics

  • 50% reduction in boilerplate code in vector DB implementations
  • 100% test coverage for common utility methods
  • Zero breaking changes to existing vector DB implementations
  • <5 second health check response times
  • 90%+ reduction in duplicate error handling code

🔗 Related Issues

📝 Notes

This enhancement provides the foundation for cleaner vector database implementations while maintaining full backward compatibility. The async patterns and pydantic integration will significantly improve code quality and developer experience.

Priority: High
Estimated Effort: Large (5-7 days)
Risk Level: Medium (interface changes with backward compatibility)

Metadata

Metadata

Assignees

No one assigned

    Labels

    enhancementNew feature or requestvector-dbVector database related

    Projects

    No projects

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions