diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 6c5143df..a3bd3756 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -67,7 +67,7 @@ jobs: RUSTC_WRAPPER: sccache SCCACHE_GHA_ENABLED: "true" - name: Check snapshots - run: cargo insta test --workspace --features full --check + run: cargo insta test --workspace --features full --check --lib --bins env: RUSTC_WRAPPER: sccache SCCACHE_GHA_ENABLED: "true" diff --git a/CHANGELOG.md b/CHANGELOG.md index ddbbb67c..0d72a016 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -18,6 +18,11 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/). - `ScoredMatch` struct exposing both skill index and cosine similarity score from matcher backends - `IntentClassification` type (`skill_name`, `confidence`, `params`) with `JsonSchema` derive for schema-enforced LLM responses - `disambiguation_threshold` in `[skills]` config section (default: 0.05) with `with_disambiguation_threshold()` builder on `Agent` +- DocumentLoader trait with text/markdown file loader in zeph-memory (#469) +- Text splitter with configurable chunk size, overlap, and sentence-aware splitting (#470) +- PDF document loader, feature-gated behind `pdf` (#471) +- Document ingestion pipeline: load, split, embed, store via Qdrant (#472) +- File size guard (50 MiB default) and path canonicalization for document loaders ## [0.10.0] - 2026-02-18 diff --git a/Cargo.lock b/Cargo.lock index 71c757e9..24ec24d6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8,6 +8,15 @@ version = "2.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "320119579fcad9c21884f5c4861d16174d0e06250625266f50fe6898340abefa" +[[package]] +name = "adobe-cmap-parser" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ae8abfa9a4688de8fc9f42b3f013b6fffec18ed8a554f5f113577e0b9b3212a3" +dependencies = [ + "pom", +] + [[package]] name = "aead" version = "0.5.2" @@ -251,9 +260,9 @@ checksum = "c08606f8c3cbf4ce6ec8e28fb0014a2c086708fe954eaa885384a6165172e7e8" [[package]] name = "aws-lc-rs" -version = "1.15.4" +version = "1.16.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7b7b6141e96a8c160799cc2d5adecd5cbbe5054cb8c7c4af53da0f83bb7ad256" +checksum = "d9a7b350e3bb1767102698302bc37256cbd48422809984b98d292c40e2579aa9" dependencies = [ "aws-lc-sys", "zeroize", @@ -601,9 +610,9 @@ dependencies = [ [[package]] name = "bumpalo" -version = "3.19.1" +version = "3.20.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5dd9dc738b7a8311c7ade152424974d8115f2cdad61e8dab8dac9f2362298510" +checksum = "5c6f81257d10a0f602a294ae4182251151ff97dbb504ef9afcdda4a64b24d9b4" [[package]] name = "bytemuck" @@ -1703,6 +1712,15 @@ dependencies = [ "windows-sys 0.61.2", ] +[[package]] +name = "euclid" +version = "0.20.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2bb7ef65b3777a325d1eeefefab5b6d4959da54747e33bd6258e789640f307ad" +dependencies = [ + "num-traits", +] + [[package]] name = "euclid" version = "0.22.13" @@ -3327,6 +3345,24 @@ version = "0.4.29" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5e5032e24019045c762d3c0f28f5b6b8bbf38563a65908389bf7978758920897" +[[package]] +name = "lopdf" +version = "0.34.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c5c8ecfc6c72051981c0459f75ccc585e7ff67c70829560cda8e647882a9abff" +dependencies = [ + "encoding_rs", + "flate2", + "indexmap 2.13.0", + "itoa", + "log", + "md-5", + "nom", + "rangemap", + "time", + "weezl", +] + [[package]] name = "lru" version = "0.16.3" @@ -4078,6 +4114,21 @@ dependencies = [ "hmac", ] +[[package]] +name = "pdf-extract" +version = "0.7.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cbb3a5387b94b9053c1e69d8abfd4dd6dae7afda65a5c5279bc1f42ab39df575" +dependencies = [ + "adobe-cmap-parser", + "encoding_rs", + "euclid 0.20.14", + "lopdf", + "postscript", + "type1-encoding-parser", + "unicode-normalization", +] + [[package]] name = "pem-rfc7468" version = "0.7.0" @@ -4339,12 +4390,24 @@ dependencies = [ "universal-hash", ] +[[package]] +name = "pom" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "60f6ce597ecdcc9a098e7fddacb1065093a3d66446fa16c675e7e71d1b5c28e6" + [[package]] name = "portable-atomic" version = "1.13.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c33a9471896f1c69cecef8d20cbe2f7accd12527ce60845ff44c153bb2a21b49" +[[package]] +name = "postscript" +version = "0.14.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "78451badbdaebaf17f053fd9152b3ffb33b516104eacb45e7864aaa9c712f306" + [[package]] name = "potential_utf" version = "0.1.4" @@ -4763,6 +4826,12 @@ dependencies = [ "rand_core 0.9.5", ] +[[package]] +name = "rangemap" +version = "1.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "973443cf09a9c8656b574a866ab68dfa19f0867d0340648c7d2f6a71b8a8ea68" + [[package]] name = "ratatui" version = "0.30.0" @@ -7156,6 +7225,15 @@ dependencies = [ "rustc-hash 2.1.1", ] +[[package]] +name = "type1-encoding-parser" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d3d6cc09e1a99c7e01f2afe4953789311a1c50baebbdac5b477ecf78e2e92a5b" +dependencies = [ + "pom", +] + [[package]] name = "typed-path" version = "0.12.3" @@ -7694,6 +7772,12 @@ dependencies = [ "rustls-pki-types", ] +[[package]] +name = "weezl" +version = "0.1.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a28ac98ddc8b9274cb41bb4d9d4d5c425b6020c50c46f25559911905610b4a88" + [[package]] name = "wezterm-bidi" version = "0.2.3" @@ -7760,7 +7844,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7012add459f951456ec9d6c7e6fc340b1ce15d6fc9629f8c42853412c029e57e" dependencies = [ "bitflags 1.3.2", - "euclid", + "euclid 0.22.13", "lazy_static", "serde", "wezterm-dynamic", @@ -8623,9 +8707,12 @@ version = "0.10.0" dependencies = [ "anyhow", "criterion", + "pdf-extract", + "proptest", "qdrant-client", "serde_json", "sqlx", + "tempfile", "testcontainers", "thiserror 2.0.18", "tokio", diff --git a/Cargo.toml b/Cargo.toml index a4612579..ddd47064 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -31,6 +31,8 @@ futures-core = "0.3" notify = "8" notify-debouncer-mini = "0.7" ollama-rs = { version = "0.3", default-features = false, features = ["rustls", "stream"] } +pdf-extract = "0.7" +proptest = "1.6" pulldown-cmark = "0.13" qdrant-client = { version = "1.16", default-features = false } ratatui = "0.30" @@ -118,6 +120,7 @@ gateway = ["dep:zeph-gateway"] daemon = ["zeph-core/daemon"] scheduler = ["dep:zeph-scheduler"] otel = ["dep:opentelemetry", "dep:opentelemetry_sdk", "dep:opentelemetry-otlp", "dep:tracing-opentelemetry"] +pdf = ["zeph-memory/pdf"] mock = ["zeph-llm/mock", "zeph-memory/mock"] [dependencies] diff --git a/README.md b/README.md index f076d0a5..3a77910a 100644 --- a/README.md +++ b/README.md @@ -322,6 +322,7 @@ Always compiled in: `openai`, `compatible`, `orchestrator`, `router`, `self-lear | `index` | AST-based code indexing | | `gateway` | HTTP webhook ingestion | | `daemon` | Component supervisor | +| `pdf` | PDF document loading for RAG | | `scheduler` | Cron-based periodic tasks | | `otel` | OpenTelemetry OTLP export | | `full` | Everything above | diff --git a/crates/zeph-memory/Cargo.toml b/crates/zeph-memory/Cargo.toml index abb798ea..60522463 100644 --- a/crates/zeph-memory/Cargo.toml +++ b/crates/zeph-memory/Cargo.toml @@ -7,6 +7,7 @@ license.workspace = true repository.workspace = true [dependencies] +pdf-extract = { workspace = true, optional = true } qdrant-client = { workspace = true, features = ["serde"] } serde_json.workspace = true sqlx = { workspace = true, features = ["runtime-tokio", "sqlite", "migrate"] } @@ -16,17 +17,20 @@ tracing.workspace = true uuid = { workspace = true, features = ["v4"] } zeph-llm.workspace = true -[[bench]] -name = "token_estimation" -harness = false - [features] default = [] mock = [] +pdf = ["dep:pdf-extract"] + +[[bench]] +name = "token_estimation" +harness = false [dev-dependencies] anyhow.workspace = true criterion.workspace = true +proptest.workspace = true +tempfile.workspace = true testcontainers.workspace = true tokio = { workspace = true, features = ["macros", "rt-multi-thread"] } tokio-stream.workspace = true diff --git a/crates/zeph-memory/README.md b/crates/zeph-memory/README.md index ce790928..89abf3e7 100644 --- a/crates/zeph-memory/README.md +++ b/crates/zeph-memory/README.md @@ -6,6 +6,8 @@ SQLite-backed conversation persistence with Qdrant vector search. Provides durable conversation storage via SQLite and semantic retrieval through Qdrant vector search. The `SemanticMemory` orchestrator combines both backends, enabling the agent to recall relevant context from past conversations using embedding similarity. +Includes a document ingestion subsystem for loading, chunking, and storing user documents (text, Markdown, PDF) into Qdrant for RAG workflows. + ## Key modules | Module | Description | @@ -14,16 +16,32 @@ Provides durable conversation storage via SQLite and semantic retrieval through | `qdrant` | Qdrant client for vector upsert and search | | `qdrant_ops` | `QdrantOps` — high-level Qdrant operations | | `semantic` | `SemanticMemory` — orchestrates SQLite + Qdrant | +| `document` | Document loading, splitting, and ingestion pipeline | +| `document::loader` | `TextLoader` (.txt/.md), `PdfLoader` (feature-gated: `pdf`) | +| `document::splitter` | `TextSplitter` with configurable chunking | +| `document::pipeline` | `IngestionPipeline` — load, split, embed, store via Qdrant | +| `vector_store` | `VectorStore` trait and `VectorPoint` types | +| `embedding_store` | `EmbeddingStore` — high-level embedding CRUD | | `types` | `ConversationId`, `MessageId`, shared types | | `error` | `MemoryError` — unified error type | -**Re-exports:** `MemoryError`, `QdrantOps`, `ConversationId`, `MessageId` +**Re-exports:** `MemoryError`, `QdrantOps`, `ConversationId`, `MessageId`, `Document`, `DocumentLoader`, `TextLoader`, `TextSplitter`, `IngestionPipeline`, `Chunk`, `SplitterConfig`, `DocumentError`, `DocumentMetadata`, `PdfLoader` (behind `pdf` feature) + +## Features + +| Feature | Description | +|---------|-------------| +| `pdf` | PDF document loading via `pdf-extract` | +| `mock` | In-memory `VectorStore` implementation for testing | ## Usage ```toml [dependencies] zeph-memory = { path = "../zeph-memory" } + +# With PDF support +zeph-memory = { path = "../zeph-memory", features = ["pdf"] } ``` ## License diff --git a/crates/zeph-memory/src/document/error.rs b/crates/zeph-memory/src/document/error.rs new file mode 100644 index 00000000..125faa9a --- /dev/null +++ b/crates/zeph-memory/src/document/error.rs @@ -0,0 +1,21 @@ +#[derive(Debug, thiserror::Error)] +pub enum DocumentError { + #[error("IO error: {0}")] + Io(#[from] std::io::Error), + + #[error("unsupported format: {0}")] + UnsupportedFormat(String), + + #[error("file too large: {0} bytes")] + FileTooLarge(u64), + + #[cfg(feature = "pdf")] + #[error("PDF error: {0}")] + Pdf(String), + + #[error("embedding failed: {0}")] + Embedding(#[from] zeph_llm::LlmError), + + #[error("storage error: {0}")] + Storage(#[from] crate::error::MemoryError), +} diff --git a/crates/zeph-memory/src/document/loader/mod.rs b/crates/zeph-memory/src/document/loader/mod.rs new file mode 100644 index 00000000..cdcb5862 --- /dev/null +++ b/crates/zeph-memory/src/document/loader/mod.rs @@ -0,0 +1,7 @@ +mod text; +pub use text::TextLoader; + +#[cfg(feature = "pdf")] +mod pdf; +#[cfg(feature = "pdf")] +pub use pdf::PdfLoader; diff --git a/crates/zeph-memory/src/document/loader/pdf.rs b/crates/zeph-memory/src/document/loader/pdf.rs new file mode 100644 index 00000000..efcfedb5 --- /dev/null +++ b/crates/zeph-memory/src/document/loader/pdf.rs @@ -0,0 +1,59 @@ +use std::collections::HashMap; +use std::path::Path; +use std::pin::Pin; + +use super::super::{ + DEFAULT_MAX_FILE_SIZE, Document, DocumentError, DocumentLoader, DocumentMetadata, +}; + +pub struct PdfLoader { + pub max_file_size: u64, +} + +impl Default for PdfLoader { + fn default() -> Self { + Self { + max_file_size: DEFAULT_MAX_FILE_SIZE, + } + } +} + +impl DocumentLoader for PdfLoader { + fn load( + &self, + path: &Path, + ) -> Pin, DocumentError>> + Send + '_>> + { + let path = path.to_path_buf(); + let max_size = self.max_file_size; + Box::pin(async move { + let path = std::fs::canonicalize(&path)?; + + let meta = tokio::fs::metadata(&path).await?; + if meta.len() > max_size { + return Err(DocumentError::FileTooLarge(meta.len())); + } + + let source = path.display().to_string(); + let path_buf = path.to_path_buf(); + let content = tokio::task::spawn_blocking(move || { + pdf_extract::extract_text(&path_buf).map_err(|e| DocumentError::Pdf(e.to_string())) + }) + .await + .map_err(|e| DocumentError::Io(std::io::Error::new(std::io::ErrorKind::Other, e)))??; + + Ok(vec![Document { + content, + metadata: DocumentMetadata { + source, + content_type: "application/pdf".to_owned(), + extra: HashMap::new(), + }, + }]) + }) + } + + fn supported_extensions(&self) -> &[&str] { + &["pdf"] + } +} diff --git a/crates/zeph-memory/src/document/loader/text.rs b/crates/zeph-memory/src/document/loader/text.rs new file mode 100644 index 00000000..c11ad7ad --- /dev/null +++ b/crates/zeph-memory/src/document/loader/text.rs @@ -0,0 +1,157 @@ +use std::collections::HashMap; +use std::path::Path; +use std::pin::Pin; + +use super::super::{ + DEFAULT_MAX_FILE_SIZE, Document, DocumentError, DocumentLoader, DocumentMetadata, +}; + +pub struct TextLoader { + pub max_file_size: u64, +} + +impl Default for TextLoader { + fn default() -> Self { + Self { + max_file_size: DEFAULT_MAX_FILE_SIZE, + } + } +} + +impl DocumentLoader for TextLoader { + fn load( + &self, + path: &Path, + ) -> Pin, DocumentError>> + Send + '_>> + { + let path = path.to_path_buf(); + let max_size = self.max_file_size; + Box::pin(async move { + let path = std::fs::canonicalize(&path)?; + + let meta = tokio::fs::metadata(&path).await?; + if meta.len() > max_size { + return Err(DocumentError::FileTooLarge(meta.len())); + } + + let ext = path.extension().and_then(|e| e.to_str()).unwrap_or(""); + + let content_type = match ext { + "md" | "markdown" => "text/markdown", + _ => "text/plain", + }; + + let content = tokio::fs::read_to_string(&path).await?; + + Ok(vec![Document { + content, + metadata: DocumentMetadata { + source: path.display().to_string(), + content_type: content_type.to_owned(), + extra: HashMap::new(), + }, + }]) + }) + } + + fn supported_extensions(&self) -> &[&str] { + &["txt", "md", "markdown"] + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[tokio::test] + async fn load_text_file() { + let dir = tempfile::tempdir().unwrap(); + let file = dir.path().join("test.txt"); + std::fs::write(&file, "hello world").unwrap(); + + let docs = TextLoader::default().load(&file).await.unwrap(); + assert_eq!(docs.len(), 1); + assert_eq!(docs[0].content, "hello world"); + assert_eq!(docs[0].metadata.content_type, "text/plain"); + } + + #[tokio::test] + async fn load_markdown_file() { + let dir = tempfile::tempdir().unwrap(); + let file = dir.path().join("readme.md"); + std::fs::write(&file, "# Title").unwrap(); + + let docs = TextLoader::default().load(&file).await.unwrap(); + assert_eq!(docs[0].metadata.content_type, "text/markdown"); + } + + #[tokio::test] + async fn load_nonexistent_file() { + let result = TextLoader::default() + .load(Path::new("/nonexistent/file.txt")) + .await; + assert!(result.is_err()); + } + + #[tokio::test] + async fn load_empty_file() { + let dir = tempfile::tempdir().unwrap(); + let file = dir.path().join("empty.txt"); + std::fs::write(&file, "").unwrap(); + + let docs = TextLoader::default().load(&file).await.unwrap(); + assert_eq!(docs.len(), 1); + assert!(docs[0].content.is_empty()); + } + + #[tokio::test] + async fn load_markdown_extension_variant() { + let dir = tempfile::tempdir().unwrap(); + let file = dir.path().join("doc.markdown"); + std::fs::write(&file, "content").unwrap(); + + let docs = TextLoader::default().load(&file).await.unwrap(); + assert_eq!(docs[0].metadata.content_type, "text/markdown"); + } + + #[tokio::test] + async fn unknown_extension_treated_as_plain_text() { + let dir = tempfile::tempdir().unwrap(); + let file = dir.path().join("data.csv"); + std::fs::write(&file, "a,b,c").unwrap(); + + let docs = TextLoader::default().load(&file).await.unwrap(); + assert_eq!(docs[0].metadata.content_type, "text/plain"); + } + + #[test] + fn supported_extensions_list() { + let loader = TextLoader::default(); + let exts = loader.supported_extensions(); + assert!(exts.contains(&"txt")); + assert!(exts.contains(&"md")); + assert!(exts.contains(&"markdown")); + } + + #[tokio::test] + async fn metadata_source_is_canonical() { + let dir = tempfile::tempdir().unwrap(); + let file = dir.path().join("test.txt"); + std::fs::write(&file, "data").unwrap(); + + let docs = TextLoader::default().load(&file).await.unwrap(); + let canonical = std::fs::canonicalize(&file).unwrap(); + assert_eq!(docs[0].metadata.source, canonical.display().to_string()); + } + + #[tokio::test] + async fn file_too_large_rejected() { + let dir = tempfile::tempdir().unwrap(); + let file = dir.path().join("big.txt"); + std::fs::write(&file, "x").unwrap(); + + let loader = TextLoader { max_file_size: 0 }; + let result = loader.load(&file).await; + assert!(matches!(result, Err(DocumentError::FileTooLarge(_)))); + } +} diff --git a/crates/zeph-memory/src/document/mod.rs b/crates/zeph-memory/src/document/mod.rs new file mode 100644 index 00000000..0698acee --- /dev/null +++ b/crates/zeph-memory/src/document/mod.rs @@ -0,0 +1,28 @@ +pub mod error; +pub mod loader; +pub mod pipeline; +pub mod splitter; +pub mod types; + +pub use error::DocumentError; +pub use loader::TextLoader; +pub use pipeline::IngestionPipeline; +pub use splitter::{SplitterConfig, TextSplitter}; +pub use types::{Chunk, Document, DocumentMetadata}; + +#[cfg(feature = "pdf")] +pub use loader::PdfLoader; + +/// Default maximum file size: 50 MiB. +pub const DEFAULT_MAX_FILE_SIZE: u64 = 50 * 1024 * 1024; + +pub trait DocumentLoader: Send + Sync { + fn load( + &self, + path: &std::path::Path, + ) -> std::pin::Pin< + Box, DocumentError>> + Send + '_>, + >; + + fn supported_extensions(&self) -> &[&str]; +} diff --git a/crates/zeph-memory/src/document/pipeline.rs b/crates/zeph-memory/src/document/pipeline.rs new file mode 100644 index 00000000..ba8743fb --- /dev/null +++ b/crates/zeph-memory/src/document/pipeline.rs @@ -0,0 +1,83 @@ +use qdrant_client::qdrant::PointStruct; +use serde_json::json; +use uuid::Uuid; + +use super::{Document, DocumentError, DocumentLoader, TextSplitter}; +use crate::QdrantOps; + +pub struct IngestionPipeline { + splitter: TextSplitter, + qdrant: QdrantOps, + collection: String, + embed_fn: Box zeph_llm::provider::EmbedFuture + Send + Sync>, +} + +impl IngestionPipeline { + pub fn new( + splitter: TextSplitter, + qdrant: QdrantOps, + collection: impl Into, + embed_fn: Box zeph_llm::provider::EmbedFuture + Send + Sync>, + ) -> Self { + Self { + splitter, + qdrant, + collection: collection.into(), + embed_fn, + } + } + + /// Ingest a document: split -> embed -> store in Qdrant. Returns chunk count. + /// + /// # Errors + /// + /// Returns an error if embedding or Qdrant storage fails. + pub async fn ingest(&self, document: Document) -> Result { + let chunks = self.splitter.split(&document); + if chunks.is_empty() { + return Ok(0); + } + + let mut points = Vec::with_capacity(chunks.len()); + for chunk in &chunks { + let vector = (self.embed_fn)(&chunk.content).await?; + let payload = QdrantOps::json_to_payload(json!({ + "source": chunk.metadata.source, + "content_type": chunk.metadata.content_type, + "chunk_index": chunk.chunk_index, + "content": chunk.content, + })) + .map_err(|e| DocumentError::Storage(crate::error::MemoryError::Json(e)))?; + + points.push(PointStruct::new( + Uuid::new_v4().to_string(), + vector, + payload, + )); + } + + let count = points.len(); + self.qdrant + .upsert(&self.collection, points) + .await + .map_err(|e| DocumentError::Storage(crate::error::MemoryError::Qdrant(e)))?; + + Ok(count) + } + + /// # Errors + /// + /// Returns an error if loading, embedding, or storage fails. + pub async fn load_and_ingest( + &self, + loader: &(dyn DocumentLoader + '_), + path: &std::path::Path, + ) -> Result { + let documents = loader.load(path).await?; + let mut total = 0; + for doc in documents { + total += self.ingest(doc).await?; + } + Ok(total) + } +} diff --git a/crates/zeph-memory/src/document/splitter.rs b/crates/zeph-memory/src/document/splitter.rs new file mode 100644 index 00000000..6b148b1c --- /dev/null +++ b/crates/zeph-memory/src/document/splitter.rs @@ -0,0 +1,381 @@ +use super::types::{Chunk, Document}; + +#[derive(Debug, Clone)] +pub struct SplitterConfig { + pub chunk_size: usize, + pub chunk_overlap: usize, + pub sentence_aware: bool, +} + +impl Default for SplitterConfig { + fn default() -> Self { + Self { + chunk_size: 1000, + chunk_overlap: 200, + sentence_aware: true, + } + } +} + +pub struct TextSplitter { + config: SplitterConfig, +} + +impl TextSplitter { + #[must_use] + pub fn new(config: SplitterConfig) -> Self { + Self { config } + } + + #[must_use] + pub fn split(&self, document: &Document) -> Vec { + let text = &document.content; + if text.is_empty() { + return Vec::new(); + } + + let pieces = if self.config.sentence_aware { + split_sentences(text) + } else { + split_chars(text, self.config.chunk_size, self.config.chunk_overlap) + }; + + if self.config.sentence_aware { + let chunks = + merge_sentences(&pieces, self.config.chunk_size, self.config.chunk_overlap); + chunks + .into_iter() + .enumerate() + .map(|(i, content)| Chunk { + content, + metadata: document.metadata.clone(), + chunk_index: i, + }) + .collect() + } else { + pieces + .into_iter() + .enumerate() + .map(|(i, content)| Chunk { + content, + metadata: document.metadata.clone(), + chunk_index: i, + }) + .collect() + } + } +} + +fn split_sentences(text: &str) -> Vec { + let mut sentences = Vec::new(); + let mut current = String::new(); + + let chars: Vec = text.chars().collect(); + let mut i = 0; + + while i < chars.len() { + current.push(chars[i]); + + // Split on paragraph breaks + if chars[i] == '\n' && i + 1 < chars.len() && chars[i + 1] == '\n' { + current.push(chars[i + 1]); + i += 1; + if !current.trim().is_empty() { + sentences.push(std::mem::take(&mut current)); + } + } + // Split on sentence endings followed by space + else if (chars[i] == '.' || chars[i] == '?' || chars[i] == '!') + && i + 1 < chars.len() + && chars[i + 1] == ' ' + && !current.trim().is_empty() + { + sentences.push(std::mem::take(&mut current)); + } + + i += 1; + } + + if !current.trim().is_empty() { + sentences.push(current); + } + + sentences +} + +/// Merge sentences into chunks, respecting size and overlap. +fn merge_sentences(sentences: &[String], chunk_size: usize, chunk_overlap: usize) -> Vec { + let mut chunks = Vec::new(); + let mut current = String::new(); + // Sliding window: track only the sentence indices contributing to the current chunk. + let mut window_start = 0; + + for (idx, sentence) in sentences.iter().enumerate() { + if !current.is_empty() && current.len() + sentence.len() > chunk_size { + chunks.push(current.clone()); + + // Build overlap from recent sentences (walk backwards from current window) + current.clear(); + let mut overlap_len = 0; + let mut overlap_start = idx; + for i in (window_start..idx).rev() { + if overlap_len + sentences[i].len() > chunk_overlap { + break; + } + overlap_len += sentences[i].len(); + overlap_start = i; + } + for s in &sentences[overlap_start..idx] { + current.push_str(s); + } + window_start = overlap_start; + } + + current.push_str(sentence); + } + + if !current.is_empty() { + chunks.push(current); + } + + chunks +} + +fn split_chars(text: &str, chunk_size: usize, overlap: usize) -> Vec { + let mut chunks = Vec::new(); + let chars: Vec = text.chars().collect(); + let step = chunk_size.saturating_sub(overlap).max(1); + let mut start = 0; + + while start < chars.len() { + let end = (start + chunk_size).min(chars.len()); + chunks.push(chars[start..end].iter().collect()); + start += step; + } + + chunks +} + +#[cfg(test)] +mod tests { + use std::collections::HashMap; + + use super::*; + use crate::document::types::DocumentMetadata; + + fn make_doc(content: &str) -> Document { + Document { + content: content.to_owned(), + metadata: DocumentMetadata { + source: "test".to_owned(), + content_type: "text/plain".to_owned(), + extra: HashMap::new(), + }, + } + } + + #[test] + fn empty_document() { + let splitter = TextSplitter::new(SplitterConfig::default()); + let chunks = splitter.split(&make_doc("")); + assert!(chunks.is_empty()); + } + + #[test] + fn single_small_chunk() { + let splitter = TextSplitter::new(SplitterConfig::default()); + let chunks = splitter.split(&make_doc("Hello world.")); + assert_eq!(chunks.len(), 1); + assert_eq!(chunks[0].chunk_index, 0); + } + + #[test] + fn sentence_aware_splitting() { + let text = "First sentence. Second sentence. Third sentence."; + let splitter = TextSplitter::new(SplitterConfig { + chunk_size: 20, + chunk_overlap: 5, + sentence_aware: true, + }); + let chunks = splitter.split(&make_doc(text)); + assert!(chunks.len() > 1); + for (i, chunk) in chunks.iter().enumerate() { + assert_eq!(chunk.chunk_index, i); + } + } + + #[test] + fn char_splitting_with_overlap() { + let text = "abcdefghijklmnopqrstuvwxyz"; + let splitter = TextSplitter::new(SplitterConfig { + chunk_size: 10, + chunk_overlap: 3, + sentence_aware: false, + }); + let chunks = splitter.split(&make_doc(text)); + assert!(chunks.len() > 1); + // Verify overlap: end of chunk N overlaps with start of chunk N+1 + assert_eq!(&chunks[0].content[7..10], &chunks[1].content[..3]); + } + + #[test] + fn metadata_preserved() { + let splitter = TextSplitter::new(SplitterConfig::default()); + let doc = make_doc("Some content."); + let chunks = splitter.split(&doc); + assert_eq!(chunks[0].metadata.source, "test"); + } + + #[test] + fn paragraph_break_splitting() { + let text = "First paragraph.\n\nSecond paragraph."; + let sentences = super::split_sentences(text); + assert_eq!(sentences.len(), 2); + } + + #[test] + fn document_smaller_than_chunk_size() { + let splitter = TextSplitter::new(SplitterConfig { + chunk_size: 1000, + chunk_overlap: 100, + sentence_aware: true, + }); + let chunks = splitter.split(&make_doc("Short text.")); + assert_eq!(chunks.len(), 1); + assert_eq!(chunks[0].content, "Short text."); + } + + #[test] + fn single_sentence_no_trailing_space() { + let sentences = super::split_sentences("Hello world"); + assert_eq!(sentences.len(), 1); + assert_eq!(sentences[0], "Hello world"); + } + + #[test] + fn char_split_no_overlap() { + let chunks = super::split_chars("abcdefghij", 5, 0); + assert_eq!(chunks, vec!["abcde", "fghij"]); + } + + #[test] + fn char_split_full_overlap_makes_progress() { + // overlap >= chunk_size should still make progress (step = max(1, 0)) + let chunks = super::split_chars("abcde", 3, 3); + assert!(!chunks.is_empty()); + assert_eq!(chunks[0], "abc"); + } + + #[test] + fn sentence_aware_overlap_includes_previous() { + let text = "A. B. C. D. E."; + let splitter = TextSplitter::new(SplitterConfig { + chunk_size: 5, + chunk_overlap: 3, + sentence_aware: true, + }); + let chunks = splitter.split(&make_doc(text)); + assert!(chunks.len() > 1); + // Later chunks should contain overlap from previous + if chunks.len() >= 2 { + // Second chunk should start with overlap content, not fresh + assert!(!chunks[1].content.is_empty()); + } + } + + #[test] + fn question_mark_splits_sentence() { + let sentences = super::split_sentences("Is this a question? Yes it is."); + assert_eq!(sentences.len(), 2); + } + + #[test] + fn exclamation_splits_sentence() { + let sentences = super::split_sentences("Wow! Amazing."); + assert_eq!(sentences.len(), 2); + } + + mod proptest_splitter { + use super::*; + use proptest::prelude::*; + + proptest! { + #![proptest_config(ProptestConfig::with_cases(1000))] + + #[test] + fn split_never_panics( + content in "\\PC{0,5000}", + chunk_size in 1usize..2000, + chunk_overlap in 0usize..500, + sentence_aware in proptest::bool::ANY, + ) { + let splitter = TextSplitter::new(SplitterConfig { + chunk_size, + chunk_overlap, + sentence_aware, + }); + let doc = make_doc(&content); + let _ = splitter.split(&doc); + } + + #[test] + fn chunks_cover_all_content( + content in "[a-z ]{10,500}", + chunk_size in 10usize..200, + ) { + let splitter = TextSplitter::new(SplitterConfig { + chunk_size, + chunk_overlap: 0, + sentence_aware: false, + }); + let doc = make_doc(&content); + let chunks = splitter.split(&doc); + + if !content.is_empty() { + prop_assert!(!chunks.is_empty()); + } + + let total_chars: usize = chunks.iter().map(|c| c.content.len()).sum(); + prop_assert!(total_chars >= content.len()); + } + + #[test] + fn chunk_indices_sequential( + content in "[a-z. ]{10,1000}", + chunk_size in 5usize..100, + sentence_aware in proptest::bool::ANY, + ) { + let splitter = TextSplitter::new(SplitterConfig { + chunk_size, + chunk_overlap: 0, + sentence_aware, + }); + let doc = make_doc(&content); + let chunks = splitter.split(&doc); + + for (i, chunk) in chunks.iter().enumerate() { + prop_assert_eq!(chunk.chunk_index, i); + } + } + + #[test] + fn no_empty_chunks( + content in "[a-z. !?]{1,500}", + chunk_size in 1usize..200, + sentence_aware in proptest::bool::ANY, + ) { + let splitter = TextSplitter::new(SplitterConfig { + chunk_size, + chunk_overlap: 0, + sentence_aware, + }); + let doc = make_doc(&content); + let chunks = splitter.split(&doc); + + for chunk in &chunks { + prop_assert!(!chunk.content.is_empty()); + } + } + } + } +} diff --git a/crates/zeph-memory/src/document/types.rs b/crates/zeph-memory/src/document/types.rs new file mode 100644 index 00000000..1eecaf9d --- /dev/null +++ b/crates/zeph-memory/src/document/types.rs @@ -0,0 +1,21 @@ +use std::collections::HashMap; + +#[derive(Debug, Clone)] +pub struct DocumentMetadata { + pub source: String, + pub content_type: String, + pub extra: HashMap, +} + +#[derive(Debug, Clone)] +pub struct Document { + pub content: String, + pub metadata: DocumentMetadata, +} + +#[derive(Debug, Clone)] +pub struct Chunk { + pub content: String, + pub metadata: DocumentMetadata, + pub chunk_index: usize, +} diff --git a/crates/zeph-memory/src/lib.rs b/crates/zeph-memory/src/lib.rs index 2d7f9960..609292ed 100644 --- a/crates/zeph-memory/src/lib.rs +++ b/crates/zeph-memory/src/lib.rs @@ -1,5 +1,6 @@ //! SQLite-backed conversation persistence with Qdrant vector search. +pub mod document; pub mod embedding_store; pub mod error; #[cfg(feature = "mock")] @@ -10,6 +11,12 @@ pub mod sqlite; pub mod types; pub mod vector_store; +#[cfg(feature = "pdf")] +pub use document::PdfLoader; +pub use document::{ + Chunk, Document, DocumentError, DocumentLoader, DocumentMetadata, IngestionPipeline, + SplitterConfig, TextLoader, TextSplitter, +}; pub use embedding_store::ensure_qdrant_collection; pub use error::MemoryError; pub use qdrant_ops::QdrantOps; diff --git a/crates/zeph-memory/tests/document_integration.rs b/crates/zeph-memory/tests/document_integration.rs new file mode 100644 index 00000000..fb088596 --- /dev/null +++ b/crates/zeph-memory/tests/document_integration.rs @@ -0,0 +1,181 @@ +use std::collections::HashMap; + +use testcontainers::GenericImage; +use testcontainers::core::{ContainerPort, WaitFor}; +use testcontainers::runners::AsyncRunner; +use zeph_memory::QdrantOps; +use zeph_memory::document::{ + Document, DocumentMetadata, IngestionPipeline, SplitterConfig, TextLoader, TextSplitter, +}; + +const QDRANT_GRPC_PORT: ContainerPort = ContainerPort::Tcp(6334); +const COLLECTION: &str = "test_documents"; +const VECTOR_SIZE: u64 = 4; + +fn qdrant_image() -> GenericImage { + GenericImage::new("qdrant/qdrant", "v1.16.0") + .with_wait_for(WaitFor::message_on_stdout("gRPC listening")) + .with_exposed_port(QDRANT_GRPC_PORT) +} + +fn fake_embed_fn() -> Box zeph_llm::provider::EmbedFuture + Send + Sync> { + Box::new(|text: &str| { + let len = text.len() as f32; + Box::pin(async move { Ok(vec![len / 1000.0, 0.1, 0.2, 0.3]) }) + }) +} + +fn make_doc(content: &str) -> Document { + Document { + content: content.to_owned(), + metadata: DocumentMetadata { + source: "test.txt".to_owned(), + content_type: "text/plain".to_owned(), + extra: HashMap::new(), + }, + } +} + +#[tokio::test] +async fn ingest_single_document() { + let container = qdrant_image().start().await.unwrap(); + let port = container.get_host_port_ipv4(6334).await.unwrap(); + let qdrant = QdrantOps::new(&format!("http://127.0.0.1:{port}")).unwrap(); + qdrant + .ensure_collection(COLLECTION, VECTOR_SIZE) + .await + .unwrap(); + + let pipeline = IngestionPipeline::new( + TextSplitter::new(SplitterConfig::default()), + qdrant.clone(), + COLLECTION, + fake_embed_fn(), + ); + + let doc = make_doc("Hello world. This is a test document."); + let count = pipeline.ingest(doc).await.unwrap(); + assert_eq!(count, 1); // small doc = single chunk + + let results = qdrant + .search(COLLECTION, vec![0.036, 0.1, 0.2, 0.3], 10, None) + .await + .unwrap(); + assert_eq!(results.len(), 1); +} + +#[tokio::test] +async fn ingest_empty_document_returns_zero() { + let container = qdrant_image().start().await.unwrap(); + let port = container.get_host_port_ipv4(6334).await.unwrap(); + let qdrant = QdrantOps::new(&format!("http://127.0.0.1:{port}")).unwrap(); + qdrant + .ensure_collection(COLLECTION, VECTOR_SIZE) + .await + .unwrap(); + + let pipeline = IngestionPipeline::new( + TextSplitter::new(SplitterConfig::default()), + qdrant, + COLLECTION, + fake_embed_fn(), + ); + + let count = pipeline.ingest(make_doc("")).await.unwrap(); + assert_eq!(count, 0); +} + +#[tokio::test] +async fn ingest_multi_chunk_document() { + let container = qdrant_image().start().await.unwrap(); + let port = container.get_host_port_ipv4(6334).await.unwrap(); + let qdrant = QdrantOps::new(&format!("http://127.0.0.1:{port}")).unwrap(); + qdrant + .ensure_collection(COLLECTION, VECTOR_SIZE) + .await + .unwrap(); + + let pipeline = IngestionPipeline::new( + TextSplitter::new(SplitterConfig { + chunk_size: 20, + chunk_overlap: 5, + sentence_aware: true, + }), + qdrant.clone(), + COLLECTION, + fake_embed_fn(), + ); + + let doc = make_doc("First sentence. Second sentence. Third sentence. Fourth sentence."); + let count = pipeline.ingest(doc).await.unwrap(); + assert!(count > 1, "expected multiple chunks, got {count}"); + + let results = qdrant + .search(COLLECTION, vec![0.0, 0.1, 0.2, 0.3], 100, None) + .await + .unwrap(); + assert_eq!(results.len(), count); +} + +#[tokio::test] +async fn load_and_ingest_text_file() { + let container = qdrant_image().start().await.unwrap(); + let port = container.get_host_port_ipv4(6334).await.unwrap(); + let qdrant = QdrantOps::new(&format!("http://127.0.0.1:{port}")).unwrap(); + qdrant + .ensure_collection(COLLECTION, VECTOR_SIZE) + .await + .unwrap(); + + let pipeline = IngestionPipeline::new( + TextSplitter::new(SplitterConfig::default()), + qdrant.clone(), + COLLECTION, + fake_embed_fn(), + ); + + let dir = tempfile::tempdir().unwrap(); + let file = dir.path().join("readme.md"); + std::fs::write(&file, "# Hello\n\nThis is a test markdown file.").unwrap(); + + let loader = TextLoader::default(); + let count = pipeline.load_and_ingest(&loader, &file).await.unwrap(); + assert_eq!(count, 1); + + let results = qdrant + .search(COLLECTION, vec![0.0, 0.1, 0.2, 0.3], 10, None) + .await + .unwrap(); + assert_eq!(results.len(), 1); +} + +#[tokio::test] +async fn ingested_chunks_have_correct_payload() { + let container = qdrant_image().start().await.unwrap(); + let port = container.get_host_port_ipv4(6334).await.unwrap(); + let qdrant = QdrantOps::new(&format!("http://127.0.0.1:{port}")).unwrap(); + let collection = "test_payload"; + qdrant + .ensure_collection(collection, VECTOR_SIZE) + .await + .unwrap(); + + let pipeline = IngestionPipeline::new( + TextSplitter::new(SplitterConfig::default()), + qdrant.clone(), + collection, + fake_embed_fn(), + ); + + let doc = make_doc("Some content for payload verification."); + pipeline.ingest(doc).await.unwrap(); + + let all = qdrant.scroll_all(collection, "source").await.unwrap(); + assert_eq!(all.len(), 1); + + let entry = all.values().next().unwrap(); + assert_eq!(entry.get("source").unwrap(), "test.txt"); + assert_eq!(entry.get("content_type").unwrap(), "text/plain"); + assert!(entry.contains_key("content")); + // chunk_index is stored as integer, scroll_all only extracts string fields +} diff --git a/docs/src/SUMMARY.md b/docs/src/SUMMARY.md index 7034e420..99c89d30 100644 --- a/docs/src/SUMMARY.md +++ b/docs/src/SUMMARY.md @@ -32,6 +32,7 @@ - [HTTP Gateway](guide/gateway.md) - [Daemon Supervisor](guide/daemon.md) - [Cron Scheduler](guide/scheduler.md) +- [Document Loaders](guide/document-loaders.md) # Architecture diff --git a/docs/src/architecture/crates.md b/docs/src/architecture/crates.md index 6a582488..f1e91eb8 100644 --- a/docs/src/architecture/crates.md +++ b/docs/src/architecture/crates.md @@ -54,6 +54,11 @@ SQLite-backed conversation persistence with Qdrant vector search. - `QdrantStore` — vector storage and cosine similarity search with `MessageKind` enum (`Regular` | `Summary`) for payload classification - `SemanticMemory

` — orchestrator coordinating SQLite + Qdrant + LlmProvider - Automatic collection creation, graceful degradation without Qdrant +- `DocumentLoader` trait — async document loading with `load(&Path)` returning `Vec`, dyn-compatible via `Pin>` +- `TextLoader` — plain text and markdown loader (`.txt`, `.md`, `.markdown`) with configurable `max_file_size` (50 MiB default) and path canonicalization +- `PdfLoader` — PDF text extraction via `pdf-extract` with `spawn_blocking` (feature-gated: `pdf`) +- `TextSplitter` — configurable text chunking with `chunk_size`, `chunk_overlap`, and sentence-aware splitting +- `IngestionPipeline` — document ingestion orchestrator: load → split → embed → store via `QdrantOps` ## zeph-channels diff --git a/docs/src/feature-flags.md b/docs/src/feature-flags.md index 3f9c17f1..3f9eefd7 100644 --- a/docs/src/feature-flags.md +++ b/docs/src/feature-flags.md @@ -31,6 +31,7 @@ Zeph uses Cargo feature flags to control optional functionality. As of M26, eigh | `daemon` | Daemon supervisor with component lifecycle, PID file, and health monitoring ([guide](guide/daemon.md)) | | `scheduler` | Cron-based periodic task scheduler with SQLite persistence ([guide](guide/scheduler.md)) | | `otel` | OpenTelemetry tracing export via OTLP/gRPC ([guide](guide/observability.md)) | +| `pdf` | PDF document loading via [pdf-extract](https://crates.io/crates/pdf-extract) for the document ingestion pipeline | | `mock` | Mock providers and channels for testing | ## Build Examples diff --git a/docs/src/guide/document-loaders.md b/docs/src/guide/document-loaders.md new file mode 100644 index 00000000..20c4db90 --- /dev/null +++ b/docs/src/guide/document-loaders.md @@ -0,0 +1,82 @@ +# Document Loaders + +Zeph supports ingesting user documents (plain text, Markdown, PDF) for retrieval-augmented generation. Documents are loaded, split into chunks, embedded, and stored in Qdrant for semantic recall. + +## DocumentLoader Trait + +All loaders implement `DocumentLoader`: + +```rust +pub trait DocumentLoader: Send + Sync { + fn load(&self, path: &Path) -> Pin, DocumentError>> + Send + '_>>; + fn supported_extensions(&self) -> &[&str]; +} +``` + +Each `Document` contains `content: String` and `metadata: DocumentMetadata` (source path, content type, extra fields). + +## TextLoader + +Loads `.txt`, `.md`, and `.markdown` files. Always available (no feature gate). + +- Reads files via `tokio::fs::read_to_string` +- Canonicalizes paths via `std::fs::canonicalize` before reading +- Rejects files exceeding `max_file_size` (default 50 MiB) with `DocumentError::FileTooLarge` +- Sets `content_type` to `text/markdown` for `.md`/`.markdown`, `text/plain` otherwise + +```rust +let loader = TextLoader::default(); +let docs = loader.load(Path::new("notes.md")).await?; +``` + +## PdfLoader + +Extracts text from PDF files using `pdf-extract`. Requires the `pdf` feature: + +```bash +cargo build --features pdf +``` + +Sync extraction is wrapped in `tokio::task::spawn_blocking`. Same `max_file_size` and path canonicalization guards as `TextLoader`. + +## TextSplitter + +Splits documents into chunks for embedding. Configurable via `SplitterConfig`: + +| Parameter | Default | Description | +|-----------|---------|-------------| +| `chunk_size` | 1000 | Maximum characters per chunk | +| `chunk_overlap` | 200 | Overlap between consecutive chunks | +| `sentence_aware` | true | Split on sentence boundaries (`. `, `? `, `! `, `\n\n`) | + +When `sentence_aware` is false, splits on character boundaries with overlap. + +```rust +let splitter = TextSplitter::new(SplitterConfig { + chunk_size: 500, + chunk_overlap: 100, + sentence_aware: true, +}); +let chunks = splitter.split(&document); +``` + +## IngestionPipeline + +Orchestrates the full flow: load → split → embed → store. + +```rust +let pipeline = IngestionPipeline::new( + TextSplitter::new(SplitterConfig::default()), + qdrant_ops, + "my_documents", + Box::new(provider.embed_fn()), +); + +// Ingest from a loaded document +let chunk_count = pipeline.ingest(document).await?; + +// Or load and ingest in one step +let chunk_count = pipeline.load_and_ingest(&TextLoader::default(), path).await?; +``` + +Each chunk is stored as a Qdrant point with payload fields: `source`, `content_type`, `chunk_index`, `content`.