Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

113 changes: 42 additions & 71 deletions crates/zeph-index/src/store.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
//! `Qdrant` collection + `SQLite` metadata for code chunks.

use qdrant_client::Qdrant;
use qdrant_client::qdrant::{
CreateCollectionBuilder, CreateFieldIndexCollectionBuilder, DeletePointsBuilder, Distance,
FieldType, Filter, PointStruct, PointsIdsList, ScalarQuantizationBuilder, ScoredPoint,
SearchPointsBuilder, UpsertPointsBuilder, VectorParamsBuilder,
CreateCollectionBuilder, CreateFieldIndexCollectionBuilder, Distance, FieldType, Filter,
PointStruct, ScalarQuantizationBuilder, ScoredPoint, VectorParamsBuilder,
};
use zeph_memory::QdrantOps;

use crate::error::Result;

Expand All @@ -14,7 +13,7 @@ const CODE_COLLECTION: &str = "zeph_code_chunks";
/// `Qdrant` + `SQLite` dual-write store for code chunks.
#[derive(Clone)]
pub struct CodeStore {
qdrant: Qdrant,
ops: QdrantOps,
collection: String,
pool: sqlx::SqlitePool,
}
Expand Down Expand Up @@ -49,9 +48,9 @@ impl CodeStore {
///
/// Returns an error if the `Qdrant` client fails to connect.
pub fn new(qdrant_url: &str, pool: sqlx::SqlitePool) -> Result<Self> {
let qdrant = Qdrant::from_url(qdrant_url).build().map_err(Box::new)?;
let ops = QdrantOps::new(qdrant_url).map_err(crate::error::IndexError::Qdrant)?;
Ok(Self {
qdrant,
ops,
collection: CODE_COLLECTION.into(),
pool,
})
Expand All @@ -73,16 +72,12 @@ impl CodeStore {
///
/// Returns an error if `Qdrant` operations fail.
pub async fn ensure_collection(&self, vector_size: u64) -> Result<()> {
if self
.qdrant
.collection_exists(&self.collection)
.await
.map_err(Box::new)?
{
if self.ops.collection_exists(&self.collection).await? {
return Ok(());
}

self.qdrant
self.ops
.client()
.create_collection(
CreateCollectionBuilder::new(&self.collection)
.vectors_config(VectorParamsBuilder::new(vector_size, Distance::Cosine))
Expand All @@ -91,30 +86,17 @@ impl CodeStore {
.await
.map_err(Box::new)?;

self.qdrant
.create_field_index(CreateFieldIndexCollectionBuilder::new(
&self.collection,
"language",
FieldType::Keyword,
))
.await
.map_err(Box::new)?;
self.qdrant
.create_field_index(CreateFieldIndexCollectionBuilder::new(
&self.collection,
"file_path",
FieldType::Keyword,
))
.await
.map_err(Box::new)?;
self.qdrant
.create_field_index(CreateFieldIndexCollectionBuilder::new(
&self.collection,
"node_type",
FieldType::Keyword,
))
.await
.map_err(Box::new)?;
for field in ["language", "file_path", "node_type"] {
self.ops
.client()
.create_field_index(CreateFieldIndexCollectionBuilder::new(
&self.collection,
field,
FieldType::Keyword,
))
.await
.map_err(Box::new)?;
}

Ok(())
}
Expand All @@ -127,26 +109,24 @@ impl CodeStore {
pub async fn upsert_chunk(&self, chunk: &ChunkInsert<'_>, vector: Vec<f32>) -> Result<String> {
let point_id = uuid::Uuid::new_v4().to_string();

let payload: std::collections::HashMap<String, qdrant_client::qdrant::Value> =
serde_json::from_value(serde_json::json!({
"file_path": chunk.file_path,
"language": chunk.language,
"node_type": chunk.node_type,
"entity_name": chunk.entity_name,
"line_start": chunk.line_start,
"line_end": chunk.line_end,
"code": chunk.code,
"scope_chain": chunk.scope_chain,
"content_hash": chunk.content_hash,
}))?;

self.qdrant
.upsert_points(UpsertPointsBuilder::new(
let payload = QdrantOps::json_to_payload(serde_json::json!({
"file_path": chunk.file_path,
"language": chunk.language,
"node_type": chunk.node_type,
"entity_name": chunk.entity_name,
"line_start": chunk.line_start,
"line_end": chunk.line_end,
"code": chunk.code,
"scope_chain": chunk.scope_chain,
"content_hash": chunk.content_hash,
}))?;

self.ops
.upsert(
&self.collection,
vec![PointStruct::new(point_id.clone(), vector, payload)],
))
.await
.map_err(Box::new)?;
)
.await?;

let line_start = i64::try_from(chunk.line_start)?;
let line_end = i64::try_from(chunk.line_end)?;
Expand Down Expand Up @@ -205,12 +185,7 @@ impl CodeStore {
.map(|(id,)| id.clone().into())
.collect::<Vec<_>>();

self.qdrant
.delete_points(
DeletePointsBuilder::new(&self.collection).points(PointsIdsList { ids: point_ids }),
)
.await
.map_err(Box::new)?;
self.ops.delete_by_ids(&self.collection, point_ids).await?;

let count = ids.len();
sqlx::query("DELETE FROM chunk_metadata WHERE file_path = ?")
Expand All @@ -232,17 +207,13 @@ impl CodeStore {
limit: usize,
filter: Option<Filter>,
) -> Result<Vec<SearchHit>> {
let mut builder = SearchPointsBuilder::new(&self.collection, query_vector, limit as u64)
.with_payload(true);

if let Some(f) = filter {
builder = builder.filter(f);
}

let results = self.qdrant.search_points(builder).await.map_err(Box::new)?;
let limit_u64 = u64::try_from(limit)?;
let results = self
.ops
.search(&self.collection, query_vector, limit_u64, filter)
.await?;

Ok(results
.result
.iter()
.filter_map(SearchHit::from_scored_point)
.collect())
Expand Down
3 changes: 2 additions & 1 deletion crates/zeph-mcp/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ repository.workspace = true

[features]
default = []
qdrant = ["dep:blake3", "dep:qdrant-client", "dep:uuid"]
qdrant = ["dep:blake3", "dep:qdrant-client", "dep:uuid", "dep:zeph-memory"]

[dependencies]
blake3 = { workspace = true, optional = true }
Expand All @@ -21,6 +21,7 @@ tokio = { workspace = true, features = ["process", "sync", "time", "rt"] }
tracing.workspace = true
uuid = { workspace = true, optional = true, features = ["v5"] }
zeph-llm.workspace = true
zeph-memory = { workspace = true, optional = true }
zeph-tools.workspace = true

[dev-dependencies]
Expand Down
Loading
Loading