Skip to content

Commit

Permalink
feat: transactional CREATE/DROP
Browse files Browse the repository at this point in the history
Signed-off-by: usamoi <usamoi@outlook.com>
  • Loading branch information
usamoi committed Jan 17, 2024
1 parent db44d23 commit 66faf0d
Show file tree
Hide file tree
Showing 46 changed files with 516 additions and 433 deletions.
4 changes: 4 additions & 0 deletions .cargo/audit.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
[advisories]
ignore = [
"RUSTSEC-2021-0127", # serde_cbor is unmaintained / serde_cbor is not used
]
6 changes: 3 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,9 @@ mockall = "0.12"

[lints]
clippy.too_many_arguments = "allow"
clippy.unnecessary_literal_unwrap = "allow"
clippy.unnecessary_unwrap = "allow"
rust.unsafe_op_in_unsafe_fn = "warn"
rust.unsafe_op_in_unsafe_fn = "deny"
rust.unused_lifetimes = "warn"
rust.unused_qualifications = "warn"

[workspace]
resolver = "2"
Expand Down
2 changes: 1 addition & 1 deletion crates/c/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,5 @@ fn main() {
.file("./src/c.c")
.opt_level(3)
.debug(true)
.compile("pgvectorsc");
.compile("vectorsc");
}
13 changes: 0 additions & 13 deletions crates/c/src/c.rs

This file was deleted.

17 changes: 13 additions & 4 deletions crates/c/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,13 @@
mod c;

#[allow(unused_imports)]
pub use self::c::*;
#[cfg(target_arch = "x86_64")]
#[link(name = "vectorsc", kind = "static")]
extern "C" {
pub fn v_f16_cosine_avx512fp16(a: *const u16, b: *const u16, n: usize) -> f32;
pub fn v_f16_dot_avx512fp16(a: *const u16, b: *const u16, n: usize) -> f32;
pub fn v_f16_sl2_avx512fp16(a: *const u16, b: *const u16, n: usize) -> f32;
pub fn v_f16_cosine_v4(a: *const u16, b: *const u16, n: usize) -> f32;
pub fn v_f16_dot_v4(a: *const u16, b: *const u16, n: usize) -> f32;
pub fn v_f16_sl2_v4(a: *const u16, b: *const u16, n: usize) -> f32;
pub fn v_f16_cosine_v3(a: *const u16, b: *const u16, n: usize) -> f32;
pub fn v_f16_dot_v3(a: *const u16, b: *const u16, n: usize) -> f32;
pub fn v_f16_sl2_v3(a: *const u16, b: *const u16, n: usize) -> f32;
}
6 changes: 4 additions & 2 deletions crates/service/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,5 +39,7 @@ clippy.derivable_impls = "allow"
clippy.len_without_is_empty = "allow"
clippy.needless_range_loop = "allow"
clippy.too_many_arguments = "allow"
clippy.unit_arg = "allow"
rust.unsafe_op_in_unsafe_fn = "warn"
rust.internal_features = "allow"
rust.unsafe_op_in_unsafe_fn = "forbid"
rust.unused_lifetimes = "warn"
rust.unused_qualifications = "warn"
4 changes: 3 additions & 1 deletion crates/service/src/algorithms/hnsw.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,9 @@ pub struct HnswMmap<S: G> {
}

#[derive(Debug, Clone, Copy, Default)]
struct HnswMmapEdge(F32, u32);
struct HnswMmapEdge(#[allow(dead_code)] F32, u32);
// we may convert a memory-mapped graph to a memory graph
// so that it speeds merging sealed segments

unsafe impl<S: G> Send for HnswMmap<S> {}
unsafe impl<S: G> Sync for HnswMmap<S> {}
Expand Down
2 changes: 1 addition & 1 deletion crates/service/src/index/indexing/flat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ impl Default for FlatIndexingOptions {
}

pub struct FlatIndexing<S: G> {
raw: crate::algorithms::flat::Flat<S>,
raw: Flat<S>,
}

impl<S: G> AbstractIndexing<S> for FlatIndexing<S> {
Expand Down
11 changes: 5 additions & 6 deletions crates/service/src/index/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ use self::delete::Delete;
use self::indexing::IndexingOptions;
use self::optimizing::OptimizingOptions;
use self::segments::growing::GrowingSegment;
use self::segments::growing::GrowingSegmentInsertError;
use self::segments::sealed::SealedSegment;
use self::segments::SegmentsOptions;
use crate::index::optimizing::indexing::OptimizerIndexing;
Expand All @@ -34,7 +33,7 @@ use validator::Validate;

#[derive(Debug, Error)]
#[error("The index view is outdated.")]
pub struct OutdatedError(#[from] pub Option<GrowingSegmentInsertError>);
pub struct OutdatedError;

#[derive(Debug, Clone, Serialize, Deserialize, Validate)]
#[serde(deny_unknown_fields)]
Expand Down Expand Up @@ -103,7 +102,6 @@ pub struct Index<S: G> {

impl<S: G> Index<S> {
pub fn create(path: PathBuf, options: IndexOptions) -> Result<Arc<Self>, ServiceError> {
assert!(options.validate().is_ok());
if let Err(err) = options.validate() {
return Err(ServiceError::BadOption {
validation: err.to_string(),
Expand Down Expand Up @@ -501,12 +499,13 @@ impl<S: G> IndexView<S> {
}
let payload = (pointer.as_u48() << 16) | self.delete.version(pointer) as Payload;
if let Some((_, growing)) = self.write.as_ref() {
if let Err(e) = growing.insert(vector, payload) {
return Ok(Err(OutdatedError(Some(e))));
use crate::index::segments::growing::GrowingSegmentInsertError;
if let Err(GrowingSegmentInsertError) = growing.insert(vector, payload) {
return Ok(Err(OutdatedError));
}
Ok(Ok(()))
} else {
Ok(Err(OutdatedError(None)))
Ok(Err(OutdatedError))
}
}
pub fn delete<F: FnMut(Pointer) -> bool>(&self, mut f: F) {
Expand Down
2 changes: 1 addition & 1 deletion crates/service/src/index/optimizing/indexing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ impl<S: G> OptimizerIndexing<S> {
.build()
.unwrap();
let weak_index = Arc::downgrade(&index);
std::mem::drop(index);
drop(index);
loop {
{
let Some(index) = weak_index.upgrade() else {
Expand Down
2 changes: 1 addition & 1 deletion crates/service/src/index/optimizing/sealing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ impl<S: G> OptimizerSealing<S> {
let dur = Duration::from_secs(index.options.optimizing.sealing_secs);
let least = index.options.optimizing.sealing_size;
let weak_index = Arc::downgrade(&index);
std::mem::drop(index);
drop(index);
let mut check = None;
loop {
{
Expand Down
43 changes: 16 additions & 27 deletions crates/service/src/instance/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,26 +72,15 @@ impl Instance {
(Distance::L2, Kind::F16) => Self::F16L2(Index::open(path)),
}
}
pub fn options(&self) -> Result<&IndexOptions, ServiceError> {
pub fn refresh(&self) {
match self {
Instance::F32Cos(x) => Ok(x.options()),
Instance::F32Dot(x) => Ok(x.options()),
Instance::F32L2(x) => Ok(x.options()),
Instance::F16Cos(x) => Ok(x.options()),
Instance::F16Dot(x) => Ok(x.options()),
Instance::F16L2(x) => Ok(x.options()),
Instance::Upgrade => Err(ServiceError::Upgrade2),
}
}
pub fn refresh(&self) -> Result<(), ServiceError> {
match self {
Instance::F32Cos(x) => Ok(x.refresh()),
Instance::F32Dot(x) => Ok(x.refresh()),
Instance::F32L2(x) => Ok(x.refresh()),
Instance::F16Cos(x) => Ok(x.refresh()),
Instance::F16Dot(x) => Ok(x.refresh()),
Instance::F16L2(x) => Ok(x.refresh()),
Instance::Upgrade => Err(ServiceError::Upgrade2),
Instance::F32Cos(x) => x.refresh(),
Instance::F32Dot(x) => x.refresh(),
Instance::F32L2(x) => x.refresh(),
Instance::F16Cos(x) => x.refresh(),
Instance::F16Dot(x) => x.refresh(),
Instance::F16L2(x) => x.refresh(),
Instance::Upgrade => (),
}
}
pub fn view(&self) -> Result<InstanceView, ServiceError> {
Expand All @@ -105,15 +94,15 @@ impl Instance {
Instance::Upgrade => Err(ServiceError::Upgrade2),
}
}
pub fn stat(&self) -> Result<IndexStat, ServiceError> {
pub fn stat(&self) -> IndexStat {
match self {
Instance::F32Cos(x) => Ok(x.stat()),
Instance::F32Dot(x) => Ok(x.stat()),
Instance::F32L2(x) => Ok(x.stat()),
Instance::F16Cos(x) => Ok(x.stat()),
Instance::F16Dot(x) => Ok(x.stat()),
Instance::F16L2(x) => Ok(x.stat()),
Instance::Upgrade => Ok(IndexStat::Upgrade),
Instance::F32Cos(x) => x.stat(),
Instance::F32Dot(x) => x.stat(),
Instance::F32L2(x) => x.stat(),
Instance::F16Cos(x) => x.stat(),
Instance::F16Dot(x) => x.stat(),
Instance::F16L2(x) => x.stat(),
Instance::Upgrade => IndexStat::Upgrade,
}
}
}
Expand Down
6 changes: 5 additions & 1 deletion crates/service/src/prelude/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use serde::{Deserialize, Serialize};
use thiserror::Error;

#[must_use]
#[derive(Debug, Error, Serialize, Deserialize)]
#[derive(Debug, Clone, Error, Serialize, Deserialize)]
#[rustfmt::skip]
pub enum ServiceError {
#[error("\
Expand All @@ -15,6 +15,10 @@ The index is not existing in the background worker.
ADVICE: Drop or rebuild the index.\
")]
UnknownIndex,
#[error("\
The index is already existing in the background worker.\
")]
KnownIndex,
#[error("\
The given vector is invalid for input.
ADVICE: Check if dimensions and scalar type of the vector is matched with the index.\
Expand Down
2 changes: 1 addition & 1 deletion crates/service/src/prelude/global/f32_dot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ impl G for F32Dot {
}

fn elkan_k_means_distance(lhs: &[F32], rhs: &[F32]) -> F32 {
super::f32_dot::dot(lhs, rhs).acos()
dot(lhs, rhs).acos()
}

#[multiversion::multiversion(targets(
Expand Down
15 changes: 8 additions & 7 deletions crates/service/src/prelude/global/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,21 @@ pub use f32_l2::F32L2;

use crate::prelude::*;
use serde::{Deserialize, Serialize};
use std::fmt::Debug;
use std::fmt::{Debug, Display};

pub trait G: Copy + std::fmt::Debug + 'static {
pub trait G: Copy + Debug + 'static {
type Scalar: Copy
+ Send
+ Sync
+ std::fmt::Debug
+ std::fmt::Display
+ serde::Serialize
+ for<'a> serde::Deserialize<'a>
+ Debug
+ Display
+ Serialize
+ for<'a> Deserialize<'a>
+ Ord
+ bytemuck::Zeroable
+ bytemuck::Pod
+ num_traits::Float
+ Float
+ Zero
+ num_traits::NumOps
+ num_traits::NumAssignOps
+ FloatCast;
Expand Down
10 changes: 5 additions & 5 deletions crates/service/src/utils/file_wal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ impl FileWal {
($t: expr) => {
match $t {
Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => {
self.status = WalStatus::Truncate;
self.status = Truncate;
return None;
}
Err(e) => panic!("{}", e),
Expand All @@ -68,7 +68,7 @@ impl FileWal {
let mut data = vec![0u8; len as usize];
resolve_eof!(self.file.read_exact(&mut data));
if crc32(&data) != crc {
self.status = WalStatus::Truncate;
self.status = Truncate;
return None;
}
self.offset += 4 + 4 + data.len();
Expand All @@ -83,7 +83,7 @@ impl FileWal {
.set_len(self.offset as _)
.expect("Failed to truncate wal.");
self.file.sync_all().expect("Failed to flush wal.");
self.status = WalStatus::Flush;
self.status = Flush;
}
pub fn write(&mut self, bytes: &[u8]) {
use byteorder::WriteBytesExt;
Expand All @@ -100,15 +100,15 @@ impl FileWal {
.expect("Failed to write wal.");
self.file.write_all(bytes).expect("Failed to write wal.");
self.offset += 4 + 4 + bytes.len();
self.status = WalStatus::Write;
self.status = Write;
}
pub fn sync_all(&mut self) {
use WalStatus::*;
let (Write | Flush) = self.status else {
panic!("Operation not permitted.")
};
self.file.sync_all().expect("Failed to flush wal.");
self.status = WalStatus::Flush;
self.status = Flush;
}
}

Expand Down
1 change: 0 additions & 1 deletion crates/service/src/utils/mmap_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ where
let file = std::fs::OpenOptions::new()
.create_new(true)
.read(true)
.write(true)
.append(true)
.open(path)
.unwrap();
Expand Down
Loading

0 comments on commit 66faf0d

Please sign in to comment.