diff --git a/Cargo.lock b/Cargo.lock index 257c79a9..bf4c60cb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6476,15 +6476,6 @@ dependencies = [ "portable-atomic", ] -[[package]] -name = "portpicker" -version = "0.1.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "be97d76faf1bfab666e1375477b23fde79eccf0276e9b63b92a39d676a889ba9" -dependencies = [ - "rand 0.8.5", -] - [[package]] name = "powerfmt" version = "0.2.0" @@ -8490,7 +8481,6 @@ dependencies = [ "lancedb", "mockall", "ollama-rs", - "once_cell", "parquet", "pgvector", "qdrant-client", @@ -8560,13 +8550,11 @@ dependencies = [ "anyhow", "async-openai", "mockall", - "portpicker", "qdrant-client", "serde", "serde_json", "swiftide-core", "swiftide-integrations", - "tempfile", "testcontainers", "tokio", "wiremock", diff --git a/Cargo.toml b/Cargo.toml index afd616ef..83938432 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,5 +1,6 @@ [workspace] members = ["swiftide", "swiftide-*", "examples", "benchmarks"] +default-members = ["swiftide", "swiftide-*"] resolver = "2" @@ -39,7 +40,6 @@ indoc = { version = "2.0" } regex = { version = "1.11.1" } uuid = { version = "1.10", features = ["v3", "v4", "serde"] } dyn-clone = { version = "1.0" } -once_cell = { version = "1.20.2" } # Integrations spider = { version = "2.13" } @@ -92,8 +92,7 @@ temp-dir = "0.1.13" wiremock = "0.6.0" test-case = "3.3.1" insta = { version = "1.41.1", features = ["yaml"] } -tempfile = "3.10.1" -portpicker = "0.1.1" + [workspace.lints.rust] unsafe_code = "forbid" diff --git a/swiftide-integrations/Cargo.toml b/swiftide-integrations/Cargo.toml index 0737bd02..834ef580 100644 --- a/swiftide-integrations/Cargo.toml +++ b/swiftide-integrations/Cargo.toml @@ -27,7 +27,6 @@ strum = { workspace = true } strum_macros = { workspace = true } regex = { workspace = true } futures-util = { workspace = true } -once_cell = { workspace = true } # Integrations async-openai = { workspace = true, optional = true } @@ -89,6 +88,7 @@ test-case = { workspace = true } indoc = { workspace = true } insta = { workspace = true } + [features] default = ["rustls"] # Ensures rustls is used diff --git a/swiftide-integrations/src/pgvector/fixtures.rs b/swiftide-integrations/src/pgvector/fixtures.rs index 9e64d930..6508893a 100644 --- a/swiftide-integrations/src/pgvector/fixtures.rs +++ b/swiftide-integrations/src/pgvector/fixtures.rs @@ -1,4 +1,43 @@ -//! This module implements common types and helper utilities for unit tests related to the pgvector +//! Test fixtures and utilities for pgvector integration testing. +//! +//! Provides test infrastructure and helper types to verify vector storage and retrieval: +//! - Mock data generation for different embedding modes +//! - Test containers for `PostgreSQL` with pgvector extension +//! - Common test scenarios and assertions +//! +//! # Examples +//! +//! ```rust +//! use swiftide_integrations::pgvector::fixtures::{TestContext, PgVectorTestData}; +//! use swiftide_core::indexing::{EmbedMode, EmbeddedField}; +//! +//! # async fn example() -> Result<(), Box> { +//! // Initialize test context with PostgreSQL container +//! let context = TestContext::setup_with_cfg( +//! Some(vec!["category", "priority"]), +//! vec![EmbeddedField::Combined].into_iter().collect() +//! ).await?; +//! +//! // Create test data for different embedding modes +//! let test_data = PgVectorTestData { +//! embed_mode: EmbedMode::SingleWithMetadata, +//! chunk: "test content", +//! metadata: None, +//! vectors: vec![PgVectorTestData::create_test_vector( +//! EmbeddedField::Combined, +//! 1.0 +//! )], +//! }; +//! # Ok(()) +//! # } +//! ``` +//! +//! The module supports testing for: +//! - Single embedding with/without metadata +//! - Per-field embeddings +//! - Combined embedding modes +//! - Different vector configurations +//! - Various metadata scenarios use crate::pgvector::PgVector; use std::collections::HashSet; use swiftide_core::{ @@ -7,11 +46,36 @@ use swiftide_core::{ }; use testcontainers::{ContainerAsync, GenericImage}; +/// Test data structure for pgvector integration testing. +/// +/// Provides a flexible structure to test different embedding modes and configurations, +/// including metadata handling and vector generation. +/// +/// # Examples +/// +/// ```rust +/// use swiftide_integrations::pgvector::fixtures::PgVectorTestData; +/// use swiftide_core::indexing::{EmbedMode, EmbeddedField}; +/// +/// let test_data = PgVectorTestData { +/// embed_mode: EmbedMode::SingleWithMetadata, +/// chunk: "test content", +/// metadata: None, +/// vectors: vec![PgVectorTestData::create_test_vector( +/// EmbeddedField::Combined, +/// 1.0 +/// )], +/// }; +/// ``` #[derive(Clone)] pub(crate) struct PgVectorTestData<'a> { + /// Embedding mode for the test case pub embed_mode: indexing::EmbedMode, + /// Test content chunk pub chunk: &'a str, + /// Optional metadata for testing metadata handling pub metadata: Option, + /// Vector embeddings with their corresponding fields pub vectors: Vec<(indexing::EmbeddedField, Vec)>, } @@ -42,8 +106,32 @@ impl<'a> PgVectorTestData<'a> { } } +/// Test context managing `PostgreSQL` container and pgvector storage. +/// +/// Handles the lifecycle of test containers and provides configured storage +/// instances for testing. +/// +/// # Examples +/// +/// ```rust +/// # use swiftide_integrations::pgvector::fixtures::TestContext; +/// # use swiftide_core::indexing::EmbeddedField; +/// # async fn example() -> Result<(), Box> { +/// // Setup test context with specific configuration +/// let context = TestContext::setup_with_cfg( +/// Some(vec!["category"]), +/// vec![EmbeddedField::Combined].into_iter().collect() +/// ).await?; +/// +/// // Use context for testing +/// context.pgv_storage.setup().await?; +/// # Ok(()) +/// # } +/// ``` pub(crate) struct TestContext { + /// Configured pgvector storage instance pub(crate) pgv_storage: PgVector, + /// Container instance running `PostgreSQL` with pgvector _pgv_db_container: ContainerAsync, } diff --git a/swiftide-integrations/src/pgvector/mod.rs b/swiftide-integrations/src/pgvector/mod.rs index ecf0410e..1bed613c 100644 --- a/swiftide-integrations/src/pgvector/mod.rs +++ b/swiftide-integrations/src/pgvector/mod.rs @@ -1,8 +1,27 @@ -//! This module integrates with the pgvector database, providing functionalities to create and manage vector collections, -//! store data, and optimize indexing for efficient searches. +//! Integration module for `PostgreSQL` vector database (pgvector) operations. //! -//! pgvector is utilized in both the `indexing::Pipeline` and `query::Pipeline` modules. - +//! This module provides a client interface for vector similarity search operations using pgvector, +//! supporting: +//! - Vector collection management with configurable schemas +//! - Efficient vector storage and indexing +//! - Connection pooling with automatic retries +//! - Batch operations for optimized performance +//! +//! The functionality is primarily used through the [`PgVector`] client, which implements +//! the [`Persist`] trait for seamless integration with indexing and query pipelines. +//! +//! # Example +//! ```rust +//! # use swiftide_integrations::pgvector::PgVector; +//! # async fn example() -> anyhow::Result<()> { +//! let client = PgVector::builder() +//! .db_url("postgresql://localhost:5432/vectors") +//! .vector_size(384) +//! .build()?; +//! +//! # Ok(()) +//! # } +//! ``` #[cfg(test)] mod fixtures; @@ -10,10 +29,10 @@ mod persist; mod pgv_table_types; use anyhow::Result; use derive_builder::Builder; -use once_cell::sync::OnceCell; use sqlx::PgPool; use std::fmt; use std::sync::Arc; +use std::sync::OnceLock; use tokio::time::Duration; use pgv_table_types::{FieldConfig, MetadataConfig, VectorConfig}; @@ -70,8 +89,12 @@ pub struct PgVector { db_conn_retry_delay: Duration, /// Lazy-initialized database connection pool. - #[builder(default = "Arc::new(OnceCell::new())")] - connection_pool: Arc>, + #[builder(default = "Arc::new(OnceLock::new())")] + connection_pool: Arc>, + + /// SQL statement used for executing bulk insert. + #[builder(default = "Arc::new(OnceLock::new())")] + sql_stmt_bulk_insert: Arc>, } impl fmt::Debug for PgVector { @@ -346,6 +369,7 @@ mod tests { ]) ; "Both mode with metadata")] #[test_log::test(tokio::test)] + #[ignore] async fn test_persist_nodes( test_cases: Vec>, vector_fields: HashSet, diff --git a/swiftide-integrations/src/pgvector/persist.rs b/swiftide-integrations/src/pgvector/persist.rs index 43bb723d..6b9973ae 100644 --- a/swiftide-integrations/src/pgvector/persist.rs +++ b/swiftide-integrations/src/pgvector/persist.rs @@ -1,8 +1,14 @@ -//! This module implements the `Persist` trait for the `PgVector` struct. -//! It provides methods for setting up storage, saving individual nodes, and batch-storing multiple nodes. -//! This integration enables the Swiftide project to use `PgVector` as a storage backend. +//! Storage persistence implementation for vector embeddings. +//! +//! Implements the [`Persist`] trait for [`PgVector`], providing vector storage capabilities: +//! - Database schema initialization and setup +//! - Single-node storage operations +//! - Optimized batch storage with configurable batch sizes +//! +//! The implementation ensures thread-safe concurrent access and handles +//! connection management automatically. use crate::pgvector::PgVector; -use anyhow::Result; +use anyhow::{anyhow, Result}; use async_trait::async_trait; use swiftide_core::{ indexing::{IndexingStream, Node}, @@ -16,6 +22,14 @@ impl Persist for PgVector { // Get or initialize the connection pool let pool = self.pool_get_or_initialize().await?; + if self.sql_stmt_bulk_insert.get().is_none() { + let sql = self.generate_unnest_upsert_sql()?; + + self.sql_stmt_bulk_insert + .set(sql) + .map_err(|_| anyhow!("SQL bulk store statement is already set"))?; + } + let mut tx = pool.begin().await?; // Create extension diff --git a/swiftide-integrations/src/pgvector/pgv_table_types.rs b/swiftide-integrations/src/pgvector/pgv_table_types.rs index f133c63d..ae22080b 100644 --- a/swiftide-integrations/src/pgvector/pgv_table_types.rs +++ b/swiftide-integrations/src/pgvector/pgv_table_types.rs @@ -1,21 +1,25 @@ -//! This module provides functionality to convert a `Node` into a `PostgreSQL` table schema. -//! This conversion is crucial for storing data in `PostgreSQL`, enabling efficient vector similarity searches -//! through the `pgvector` extension. The module also handles metadata augmentation and ensures compatibility -//! with `PostgreSQL`'s required data format. - +//! `PostgreSQL` table schema and type conversion utilities for vector storage. +//! +//! Provides schema configuration and data type conversion functionality: +//! - Table schema generation with vector and metadata columns +//! - Field configuration for different vector embedding types +//! - HNSW index creation for similarity search optimization +//! - Bulk data preparation and SQL query generation +//! use crate::pgvector::PgVector; use anyhow::{anyhow, Result}; -// use once_cell::sync::OnceCell; use pgvector as ExtPgVector; use regex::Regex; use sqlx::postgres::PgArguments; use sqlx::postgres::PgPoolOptions; use sqlx::PgPool; -use std::collections::BTreeMap; -// use std::sync::Arc; use swiftide_core::indexing::{EmbeddedField, Node}; use tokio::time::sleep; +/// Configuration for vector embedding columns in the `PostgreSQL` table. +/// +/// This struct defines how vector embeddings are stored and managed in the database, +/// mapping Swiftide's embedded fields to `PostgreSQL` vector columns. #[derive(Clone, Debug)] pub struct VectorConfig { embedded_field: EmbeddedField, @@ -40,6 +44,10 @@ impl From for VectorConfig { } } +/// Configuration for metadata fields in the `PostgreSQL` table. +/// +/// Handles the mapping and storage of metadata fields, ensuring proper column naming +/// and type conversion for `PostgreSQL` compatibility. #[derive(Clone, Debug)] pub struct MetadataConfig { field: String, @@ -62,11 +70,26 @@ impl> From for MetadataConfig { } } +/// Field configuration types supported in the `PostgreSQL` table schema. +/// +/// Represents different field types that can be configured in the table schema, +/// including vector embeddings, metadata, and system fields. +/// +/// # Variants +/// +/// * `Vector` - Vector embedding field configuration +/// * `Metadata` - Metadata field configuration +/// * `Chunk` - Text content storage field +/// * `ID` - Primary key field #[derive(Clone, Debug)] pub enum FieldConfig { + /// `Vector` - Vector embedding field configuration Vector(VectorConfig), + /// `Metadata` - Metadata field configuration Metadata(MetadataConfig), + /// `Chunk` - Text content storage field Chunk, + /// `ID` - Primary key field ID, } @@ -81,13 +104,67 @@ impl FieldConfig { } } -/// Structure to hold collected values for bulk upsert -#[derive(Default)] -struct BulkUpsertData { +/// Internal structure for managing bulk upsert operations. +/// +/// Collects and organizes data for efficient bulk insertions and updates, +/// grouping related fields for UNNEST-based operations. +struct BulkUpsertData<'a> { ids: Vec, - chunks: Vec, - metadata_fields: BTreeMap>, - vector_fields: BTreeMap>, + chunks: Vec<&'a str>, + metadata_fields: Vec>, + vector_fields: Vec>, + field_mapping: FieldMapping<'a>, +} + +struct FieldMapping<'a> { + metadata_names: Vec<&'a str>, + vector_names: Vec<&'a str>, +} + +impl<'a> BulkUpsertData<'a> { + fn new(fields: &'a [FieldConfig], size: usize) -> Self { + let (metadata_names, vector_names): (Vec<&str>, Vec<&str>) = ( + fields + .iter() + .filter_map(|field| match field { + FieldConfig::Metadata(config) => Some(config.field.as_str()), + _ => None, + }) + .collect(), + fields + .iter() + .filter_map(|field| match field { + FieldConfig::Vector(config) => Some(config.field.as_str()), + _ => None, + }) + .collect(), + ); + + Self { + ids: Vec::with_capacity(size), + chunks: Vec::with_capacity(size), + metadata_fields: vec![Vec::with_capacity(size); metadata_names.len()], + vector_fields: vec![Vec::with_capacity(size); vector_names.len()], + field_mapping: FieldMapping { + metadata_names, + vector_names, + }, + } + } + + fn get_metadata_index(&self, field: &str) -> Option { + self.field_mapping + .metadata_names + .iter() + .position(|&name| name == field) + } + + fn get_vector_index(&self, field: &str) -> Option { + self.field_mapping + .vector_names + .iter() + .position(|&name| name == field) + } } impl PgVector { @@ -182,9 +259,15 @@ impl PgVector { let mut tx = pool.begin().await?; let bulk_data = self.prepare_bulk_data(nodes)?; - let sql = self.generate_unnest_upsert_sql()?; - let query = self.bind_bulk_data_to_query(sqlx::query(&sql), &bulk_data)?; + let sql = self + .sql_stmt_bulk_insert + .get() + .ok_or_else(|| anyhow!("SQL bulk insert statement not set"))?; + + tracing::info!("Sql statement :: {:#?}", sql); + + let query = self.bind_bulk_data_to_query(sqlx::query(sql), &bulk_data)?; query .execute(&mut *tx) @@ -198,30 +281,32 @@ impl PgVector { /// Prepares data from nodes into vectors for bulk processing. #[allow(clippy::implicit_clone)] - fn prepare_bulk_data(&self, nodes: &[Node]) -> Result { - let mut bulk_data = BulkUpsertData::default(); + fn prepare_bulk_data<'a>(&'a self, nodes: &'a [Node]) -> Result> { + let mut bulk_data = BulkUpsertData::new(&self.fields, nodes.len()); for node in nodes { bulk_data.ids.push(node.id()); - bulk_data.chunks.push(node.chunk.clone()); + bulk_data.chunks.push(node.chunk.as_str()); for field in &self.fields { match field { FieldConfig::Metadata(config) => { - let value = node.metadata.get(&config.original_field).ok_or_else(|| { - anyhow!("Metadata field {} not found", config.original_field) - })?; + let idx = bulk_data + .get_metadata_index(config.field.as_str()) + .ok_or_else(|| anyhow!("Invalid metadata field"))?; - let entry = bulk_data - .metadata_fields - .entry(config.field.clone()) - .or_default(); + let value = node + .metadata + .get(&config.original_field) + .ok_or_else(|| anyhow!("Missing metadata field"))?; - let mut metadata_map = BTreeMap::new(); - metadata_map.insert(config.original_field.clone(), value.clone()); - entry.push(serde_json::to_value(metadata_map)?); + bulk_data.metadata_fields[idx].push(value.clone()); } FieldConfig::Vector(config) => { + let idx = bulk_data + .get_vector_index(config.field.as_str()) + .ok_or_else(|| anyhow!("Invalid vector field"))?; + let data = node .vectors .as_ref() @@ -229,13 +314,9 @@ impl PgVector { .map(|v| v.to_vec()) .unwrap_or_default(); - bulk_data - .vector_fields - .entry(config.field.clone()) - .or_default() - .push(ExtPgVector::Vector::from(data)); + bulk_data.vector_fields[idx].push(ExtPgVector::Vector::from(data)); } - _ => continue, // ID and Chunk already handled + _ => continue, } } } @@ -252,7 +333,7 @@ impl PgVector { /// # Errors /// /// Returns an error if `self.fields` is empty, as no valid SQL can be generated. - fn generate_unnest_upsert_sql(&self) -> Result { + pub(crate) fn generate_unnest_upsert_sql(&self) -> Result { if self.fields.is_empty() { return Err(anyhow!("Cannot generate upsert SQL with empty fields")); } @@ -319,24 +400,24 @@ impl PgVector { query = match field { FieldConfig::ID => query.bind(&bulk_data.ids), FieldConfig::Chunk => query.bind(&bulk_data.chunks), + FieldConfig::Vector(config) => { + let idx = bulk_data + .get_vector_index(config.field.as_str()) + .ok_or_else(|| { + anyhow!("Vector field {} not found in bulk data", config.field) + })?; + query.bind(&bulk_data.vector_fields[idx]) + } FieldConfig::Metadata(config) => { - let values = bulk_data - .metadata_fields - .get(&config.field) + let idx = bulk_data + .get_metadata_index(config.field.as_str()) .ok_or_else(|| { anyhow!("Metadata field {} not found in bulk data", config.field) })?; - query.bind(values) - } - FieldConfig::Vector(config) => { - let vectors = bulk_data.vector_fields.get(&config.field).ok_or_else(|| { - anyhow!("Vector field {} not found in bulk data", config.field) - })?; - query.bind(vectors) + query.bind(&bulk_data.metadata_fields[idx]) } }; } - Ok(query) } @@ -364,9 +445,29 @@ impl PgVector { impl PgVector { pub(crate) fn normalize_field_name(field: &str) -> String { - field - .to_lowercase() - .replace(|c: char| !c.is_alphanumeric(), "_") + // Define the special characters as an array + let special_chars: [char; 4] = ['(', '[', '{', '<']; + + // First split by special characters and take the first part + let base_text = field + .split(|c| special_chars.contains(&c)) + .next() + .unwrap_or(field) + .trim(); + + // Split by whitespace, take up to 3 words, convert to lowercase + let normalized = base_text + .split_whitespace() + .take(3) + .collect::>() + .join("_") + .to_lowercase(); + + // Ensure the result only contains alphanumeric chars and underscores + normalized + .chars() + .filter(|c| c.is_alphanumeric() || *c == '_') + .collect() } pub(crate) fn is_valid_identifier(identifier: &str) -> bool { @@ -440,7 +541,7 @@ impl PgVector { /// or creates and initializes it if it is not. /// /// # Errors - /// This function will return an error if pool creation fails. + /// This function will return an error if pool creation fails. pub async fn pool_get_or_initialize(&self) -> Result<&PgPool> { if let Some(pool) = self.connection_pool.get() { return Ok(pool); diff --git a/swiftide-test-utils/Cargo.toml b/swiftide-test-utils/Cargo.toml index b0a344db..840729f7 100644 --- a/swiftide-test-utils/Cargo.toml +++ b/swiftide-test-utils/Cargo.toml @@ -27,8 +27,6 @@ wiremock = { workspace = true } serde = { workspace = true } serde_json = { workspace = true } tokio = { workspace = true } -tempfile = { workspace = true } -portpicker = { workspace = true } [features] default = ["test-utils"] diff --git a/swiftide-test-utils/src/test_utils.rs b/swiftide-test-utils/src/test_utils.rs index b95d670c..3ce54df6 100644 --- a/swiftide-test-utils/src/test_utils.rs +++ b/swiftide-test-utils/src/test_utils.rs @@ -3,7 +3,7 @@ use serde_json::json; use testcontainers::{ - core::{wait::HttpWaitStrategy, IntoContainerPort, Mount, WaitFor}, + core::{wait::HttpWaitStrategy, IntoContainerPort, WaitFor}, runners::AsyncRunner, ContainerAsync, GenericImage, ImageExt, }; @@ -75,23 +75,20 @@ pub async fn start_redis() -> (ContainerAsync, String) { /// Setup Postgres container. /// Returns container server and `server_url`. pub async fn start_postgres() -> (ContainerAsync, String) { - // Find a free port on the host for Postgres to use - let host_port = portpicker::pick_unused_port().expect("No available free port on the host"); - let postgres = testcontainers::GenericImage::new("pgvector/pgvector", "pg17") .with_wait_for(WaitFor::message_on_stdout( "database system is ready to accept connections", )) - .with_mapped_port(host_port, 5432.tcp()) + .with_exposed_port(5432.tcp()) .with_env_var("POSTGRES_USER", "myuser") .with_env_var("POSTGRES_PASSWORD", "mypassword") .with_env_var("POSTGRES_DB", "mydatabase") - .with_mount(Mount::tmpfs_mount("/var/lib/postgresql/data")) .start() .await .expect("Failed to start Postgres container"); // Construct the connection URL using the dynamically assigned port + let host_port = postgres.get_host_port_ipv4(5432).await.unwrap(); let postgres_url = format!( "postgresql://myuser:mypassword@127.0.0.1:{}/mydatabase", host_port