Skip to content

Commit

Permalink
fix: seperate flexible options
Browse files Browse the repository at this point in the history
Signed-off-by: cutecutecat <junyuchen@tensorchord.ai>
  • Loading branch information
cutecutecat committed Feb 28, 2024
1 parent 2d38087 commit 80c326a
Show file tree
Hide file tree
Showing 7 changed files with 123 additions and 45 deletions.
69 changes: 47 additions & 22 deletions crates/base/src/index.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::AtomicU16;
use std::sync::atomic::Ordering;

use crate::distance::*;
Expand All @@ -7,6 +7,51 @@ use serde::{Deserialize, Serialize};
use uuid::Uuid;
use validator::{Validate, ValidationError};

#[derive(Debug, Serialize, Deserialize)]
#[serde(deny_unknown_fields)]
pub struct IndexFlexibleOptions {
#[serde(default = "IndexFlexibleOptions::default_optimizing_threads")]
pub optimizing_threads: AtomicU16,
}

impl IndexFlexibleOptions {
fn default_optimizing_threads() -> AtomicU16 {
match std::thread::available_parallelism() {
Ok(threads) => AtomicU16::new((threads.get() as f64).sqrt() as _),
Err(_) => AtomicU16::new(1),
}
}
}

impl Clone for IndexFlexibleOptions {
fn clone(&self) -> Self {
IndexFlexibleOptions {
optimizing_threads: AtomicU16::new(self.optimizing_threads.load(Ordering::Relaxed)),
}
}
}

impl PartialEq for IndexFlexibleOptions {
fn eq(&self, other: &Self) -> bool {
if self.optimizing_threads.load(Ordering::Relaxed)
!= other.optimizing_threads.load(Ordering::Relaxed)
{
return false;
}
true
}
}

impl Eq for IndexFlexibleOptions {}

impl Default for IndexFlexibleOptions {
fn default() -> Self {
Self {
optimizing_threads: Self::default_optimizing_threads(),
}
}
}

#[derive(Debug, Clone, Serialize, Deserialize, Validate)]
#[serde(deny_unknown_fields)]
#[validate(schema(function = "IndexOptions::validate_index_options"))]
Expand Down Expand Up @@ -91,7 +136,7 @@ impl Default for SegmentsOptions {
}
}

#[derive(Debug, Serialize, Deserialize, Validate)]
#[derive(Debug, Clone, Serialize, Deserialize, Validate)]
#[serde(deny_unknown_fields)]
pub struct OptimizingOptions {
#[serde(default = "OptimizingOptions::default_sealing_secs")]
Expand All @@ -103,19 +148,6 @@ pub struct OptimizingOptions {
#[serde(default = "OptimizingOptions::default_delete_threshold")]
#[validate(range(min = 0.01, max = 1.00))]
pub delete_threshold: f64,
#[serde(default = "OptimizingOptions::default_optimizing_threads")]
pub optimizing_threads: AtomicUsize,
}

impl Clone for OptimizingOptions {
fn clone(&self) -> Self {
OptimizingOptions {
sealing_secs: self.sealing_secs,
sealing_size: self.sealing_size,
delete_threshold: self.delete_threshold,
optimizing_threads: AtomicUsize::new(self.optimizing_threads.load(Ordering::Relaxed)),
}
}
}

impl OptimizingOptions {
Expand All @@ -128,12 +160,6 @@ impl OptimizingOptions {
fn default_delete_threshold() -> f64 {
0.2
}
fn default_optimizing_threads() -> AtomicUsize {
match std::thread::available_parallelism() {
Ok(threads) => AtomicUsize::new((threads.get() as f64).sqrt() as _),
Err(_) => AtomicUsize::new(1),
}
}
}

impl Default for OptimizingOptions {
Expand All @@ -142,7 +168,6 @@ impl Default for OptimizingOptions {
sealing_secs: Self::default_sealing_secs(),
sealing_size: Self::default_sealing_size(),
delete_threshold: Self::default_delete_threshold(),
optimizing_threads: Self::default_optimizing_threads(),
}
}
}
Expand Down
24 changes: 20 additions & 4 deletions crates/service/src/index/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@ pub mod delete;
pub mod indexing;
pub mod optimizing;
pub mod segments;
pub mod sync;

use self::delete::Delete;
use self::segments::growing::GrowingSegment;
use self::segments::sealed::SealedSegment;
use self::sync::FlexibleOptionSyncing;
use crate::index::optimizing::indexing::OptimizerIndexing;
use crate::index::optimizing::sealing::OptimizerSealing;
use crate::prelude::*;
Expand Down Expand Up @@ -36,6 +38,7 @@ pub struct OutdatedError;
pub struct Index<S: G> {
path: PathBuf,
options: IndexOptions,
flexible: IndexFlexibleOptions,
delete: Arc<Delete>,
protect: Mutex<IndexProtect<S>>,
view: ArcSwap<IndexView<S>>,
Expand Down Expand Up @@ -70,6 +73,7 @@ impl<S: G> Index<S> {
let index = Arc::new(Index {
path: path.clone(),
options: options.clone(),
flexible: IndexFlexibleOptions::default(),
delete: delete.clone(),
protect: Mutex::new(IndexProtect {
startup,
Expand All @@ -90,13 +94,18 @@ impl<S: G> Index<S> {
});
OptimizerIndexing::new(index.clone()).spawn();
OptimizerSealing::new(index.clone()).spawn();
FlexibleOptionSyncing::new(index.clone()).spawn();
Ok(index)
}

pub fn open(path: PathBuf) -> Arc<Self> {
let options =
serde_json::from_slice::<IndexOptions>(&std::fs::read(path.join("options")).unwrap())
.unwrap();
let flexible = serde_json::from_slice::<IndexFlexibleOptions>(
&std::fs::read(path.join("flexible")).unwrap_or_default(),
)
.unwrap_or_default();
let tracker = Arc::new(IndexTracker { path: path.clone() });
let startup = FileAtomic::<IndexStartup>::open(path.join("startup"));
clean(
Expand Down Expand Up @@ -144,6 +153,7 @@ impl<S: G> Index<S> {
let index = Arc::new(Index {
path: path.clone(),
options: options.clone(),
flexible,
delete: delete.clone(),
protect: Mutex::new(IndexProtect {
startup,
Expand All @@ -164,6 +174,7 @@ impl<S: G> Index<S> {
});
OptimizerIndexing::new(index.clone()).spawn();
OptimizerSealing::new(index.clone()).spawn();
FlexibleOptionSyncing::new(index.clone()).spawn();
index
}
pub fn options(&self) -> &IndexOptions {
Expand All @@ -175,12 +186,17 @@ impl<S: G> Index<S> {
pub fn setting(&self, key: String, value: String) -> Result<(), SettingError> {
match key.as_str() {
"optimizing.threads" => {
let parsed = usize::from_str(value.as_str())
let parsed = i32::from_str(value.as_str())
.map_err(|_e| SettingError::BadValue { key, value })?;
self.options
.optimizing
let threads = match parsed {
-1 => IndexFlexibleOptions::default()
.optimizing_threads
.load(Ordering::Relaxed),
threads_limit => threads_limit as u16,
};
self.flexible
.optimizing_threads
.store(parsed, Ordering::Relaxed);
.store(threads, Ordering::Relaxed);
}
_ => return Err(SettingError::BadKey { key }),
};
Expand Down
26 changes: 9 additions & 17 deletions crates/service/src/index/optimizing/indexing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,23 +31,15 @@ impl<S: G> OptimizerIndexing<S> {
let Some(index) = weak_index.upgrade() else {
return;
};
let threads = match index
.options
.optimizing
.optimizing_threads
.load(Ordering::Relaxed)
{
0 => OptimizingOptions::default()
.optimizing_threads
.load(Ordering::Relaxed),
threads_limit => threads_limit,
};
let pool = rayon::ThreadPoolBuilder::new()
.num_threads(threads)
.build()
.unwrap();
if let Ok(()) = pool.install(|| optimizing_indexing(index.clone())) {
continue;
let threads = index.flexible.optimizing_threads.load(Ordering::Relaxed);
if threads > 0 {
let pool = rayon::ThreadPoolBuilder::new()
.num_threads(threads as usize)
.build()
.unwrap();
if let Ok(()) = pool.install(|| optimizing_indexing(index.clone())) {
continue;
}
}
}
std::thread::sleep(std::time::Duration::from_secs(60));
Expand Down
42 changes: 42 additions & 0 deletions crates/service/src/index/sync.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
use super::Index;
use crate::prelude::*;
use base::index::IndexFlexibleOptions;
use std::sync::Arc;

pub struct FlexibleOptionSyncing<S: G> {
index: Arc<Index<S>>,
}

impl<S: G> FlexibleOptionSyncing<S> {
pub fn new(index: Arc<Index<S>>) -> Self {
Self { index }
}
pub fn spawn(self) {
std::thread::spawn(move || {
self.main();
});
}
pub fn main(self) {
let index = self.index;
let weak_index = Arc::downgrade(&index);
let mut old_options = index.flexible.clone();
drop(index);
loop {
{
let Some(index) = weak_index.upgrade() else {
return;
};
let new_options = index.flexible.clone();
if new_options != old_options {
std::fs::write(
index.path.join("flexible"),
serde_json::to_string::<IndexFlexibleOptions>(&new_options).unwrap(),
)
.unwrap();
old_options = new_options;
}
}
std::thread::sleep(std::time::Duration::from_secs(5));
}
}
}
3 changes: 3 additions & 0 deletions crates/service/src/utils/clean.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ pub fn clean(path: impl AsRef<Path>, wanted: impl Iterator<Item = String>) {
.unwrap();
let wanted = HashSet::<String>::from_iter(wanted);
for dir in dirs {
if dir.path().is_file() {
log::info!("Unexpected file {:?}, skip.", dir.path());
}
let filename = dir.file_name();
let filename = filename.to_str().unwrap();
let p = path.as_ref().join(filename);
Expand Down
2 changes: 1 addition & 1 deletion src/index/views.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use pgrx::error;
use pgrx::pg_sys::{AsPgCStr, RelnameGetRelid};

#[pgrx::pg_extern(volatile, strict)]
fn _vectors_index_setting(index: String, key: String, value: String) {
fn _vectors_alter_vector_index(index: String, key: String, value: String) {
unsafe {
let oid = RelnameGetRelid(index.as_pg_cstr());
let id = Handle::from_sys(oid);
Expand Down
2 changes: 1 addition & 1 deletion src/sql/finalize.sql
Original file line number Diff line number Diff line change
Expand Up @@ -361,7 +361,7 @@ END;
$$;

CREATE FUNCTION alter_vector_index(input TEXT, "key" TEXT, "value" TEXT) RETURNS void
STRICT LANGUAGE c AS 'MODULE_PATHNAME', '_vectors_index_setting_wrapper';
STRICT LANGUAGE c AS 'MODULE_PATHNAME', '_vectors_alter_vector_index_wrapper';

-- List of casts

Expand Down

0 comments on commit 80c326a

Please sign in to comment.