Skip to content

Commit

Permalink
feat: add optiming_threads to index
Browse files Browse the repository at this point in the history
Signed-off-by: cutecutecat <junyuchen@tensorchord.ai>
  • Loading branch information
cutecutecat committed Mar 21, 2024
1 parent 402916d commit cd2a473
Show file tree
Hide file tree
Showing 13 changed files with 256 additions and 11 deletions.
67 changes: 57 additions & 10 deletions crates/base/src/index.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use crate::distance::*;
use crate::vector::*;
use serde::{Deserialize, Serialize};
use std::sync::atomic::AtomicU16;
use std::sync::atomic::Ordering;
use thiserror::Error;
use uuid::Uuid;
use validator::{Validate, ValidationError};
Expand Down Expand Up @@ -78,6 +80,61 @@ pub enum StatError {
NotExist,
}

#[must_use]
#[derive(Debug, Clone, Error, Serialize, Deserialize)]
pub enum SettingError {
#[error("Setting key {key} is not exist.")]
BadKey { key: String },
#[error("Setting key {key} has a wrong value {value}.")]
BadValue { key: String, value: String },
#[error("Index not found.")]
NotExist,
}

#[derive(Debug, Serialize, Deserialize)]
#[serde(deny_unknown_fields)]
pub struct IndexFlexibleOptions {
#[serde(default = "IndexFlexibleOptions::default_optimizing_threads")]
pub optimizing_threads: AtomicU16,
}

impl IndexFlexibleOptions {
fn default_optimizing_threads() -> AtomicU16 {
match std::thread::available_parallelism() {
Ok(threads) => AtomicU16::new((threads.get() as f64).sqrt() as _),
Err(_) => AtomicU16::new(1),
}
}
pub fn optimizing_threads_eq(&self, other: &Self) -> bool {
self.optimizing_threads.load(Ordering::Relaxed)
== other.optimizing_threads.load(Ordering::Relaxed)
}
}

impl Clone for IndexFlexibleOptions {
fn clone(&self) -> Self {
IndexFlexibleOptions {
optimizing_threads: AtomicU16::new(self.optimizing_threads.load(Ordering::Relaxed)),
}
}
}

impl PartialEq for IndexFlexibleOptions {
fn eq(&self, other: &Self) -> bool {
self.optimizing_threads_eq(other)
}
}

impl Eq for IndexFlexibleOptions {}

impl Default for IndexFlexibleOptions {
fn default() -> Self {
Self {
optimizing_threads: Self::default_optimizing_threads(),
}
}
}

#[derive(Debug, Clone, Serialize, Deserialize, Validate)]
#[serde(deny_unknown_fields)]
#[validate(schema(function = "IndexOptions::validate_index_options"))]
Expand Down Expand Up @@ -198,9 +255,6 @@ pub struct OptimizingOptions {
#[serde(default = "OptimizingOptions::default_delete_threshold")]
#[validate(range(min = 0.01, max = 1.00))]
pub delete_threshold: f64,
#[serde(default = "OptimizingOptions::default_optimizing_threads")]
#[validate(range(min = 1, max = 65535))]
pub optimizing_threads: usize,
}

impl OptimizingOptions {
Expand All @@ -213,12 +267,6 @@ impl OptimizingOptions {
fn default_delete_threshold() -> f64 {
0.2
}
fn default_optimizing_threads() -> usize {
match std::thread::available_parallelism() {
Ok(threads) => (threads.get() as f64).sqrt() as _,
Err(_) => 1,
}
}
}

impl Default for OptimizingOptions {
Expand All @@ -227,7 +275,6 @@ impl Default for OptimizingOptions {
sealing_secs: Self::default_sealing_secs(),
sealing_size: Self::default_sealing_size(),
delete_threshold: Self::default_delete_threshold(),
optimizing_threads: Self::default_optimizing_threads(),
}
}
}
Expand Down
1 change: 1 addition & 0 deletions crates/base/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ pub trait WorkerOperations {
fn view_vbase(&self, handle: Handle) -> Result<impl ViewVbaseOperations, VbaseError>;
fn view_list(&self, handle: Handle) -> Result<impl ViewListOperations, ListError>;
fn stat(&self, handle: Handle) -> Result<IndexStat, StatError>;
fn setting(&self, handle: Handle, key: String, value: String) -> Result<(), SettingError>;
}

pub trait ViewBasicOperations {
Expand Down
3 changes: 3 additions & 0 deletions crates/common/src/clean.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ pub fn clean(path: impl AsRef<Path>, wanted: impl Iterator<Item = String>) {
.unwrap();
let wanted = HashSet::<String>::from_iter(wanted);
for dir in dirs {
if dir.path().is_file() {
log::info!("Unexpected file {:?}, skip.", dir.path());
}
let filename = dir.file_name();
let filename = filename.to_str().unwrap();
let p = path.as_ref().join(filename);
Expand Down
47 changes: 47 additions & 0 deletions crates/index/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,15 @@ pub mod indexing;
pub mod optimizing;
pub mod segments;

mod setting;
mod utils;

use self::delete::Delete;
use self::segments::growing::GrowingSegment;
use self::segments::sealed::SealedSegment;
use crate::optimizing::indexing::OptimizerIndexing;
use crate::optimizing::sealing::OptimizerSealing;
use crate::setting::FlexibleOptionSyncing;
use crate::utils::tournament_tree::LoserTree;
use arc_swap::ArcSwap;
pub use base::distance::*;
Expand All @@ -34,6 +36,8 @@ use std::collections::HashMap;
use std::collections::HashSet;
use std::convert::Infallible;
use std::path::PathBuf;
use std::str::FromStr;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::thread::JoinHandle;
use std::time::Instant;
Expand All @@ -51,18 +55,21 @@ pub struct OutdatedError;
pub struct Index<O: Op> {
path: PathBuf,
options: IndexOptions,
flexible: IndexFlexibleOptions,
delete: Arc<Delete>,
protect: Mutex<IndexProtect<O>>,
view: ArcSwap<IndexView<O>>,
instant_index: AtomicCell<Instant>,
instant_write: AtomicCell<Instant>,
background_indexing: Mutex<Option<(Sender<Infallible>, JoinHandle<()>)>>,
background_sealing: Mutex<Option<(Sender<Infallible>, JoinHandle<()>)>>,
background_setting: Mutex<Option<(Sender<Infallible>, JoinHandle<()>)>>,
_tracker: Arc<IndexTracker>,
}

impl<O: Op> Index<O> {
pub fn create(path: PathBuf, options: IndexOptions) -> Result<Arc<Self>, CreateError> {
let flexible = IndexFlexibleOptions::default();
if let Err(err) = options.validate() {
return Err(CreateError::InvalidIndexOptions {
reason: err.to_string(),
Expand All @@ -87,6 +94,7 @@ impl<O: Op> Index<O> {
let index = Arc::new(Index {
path: path.clone(),
options: options.clone(),
flexible,
delete: delete.clone(),
protect: Mutex::new(IndexProtect {
startup,
Expand All @@ -105,6 +113,7 @@ impl<O: Op> Index<O> {
instant_write: AtomicCell::new(Instant::now()),
background_indexing: Mutex::new(None),
background_sealing: Mutex::new(None),
background_setting: Mutex::new(None),
_tracker: Arc::new(IndexTracker { path }),
});
Ok(index)
Expand All @@ -114,6 +123,10 @@ impl<O: Op> Index<O> {
let options =
serde_json::from_slice::<IndexOptions>(&std::fs::read(path.join("options")).unwrap())
.unwrap();
let flexible = serde_json::from_slice::<IndexFlexibleOptions>(
&std::fs::read(path.join("flexible")).unwrap_or_default(),
)
.unwrap_or_default();
let tracker = Arc::new(IndexTracker { path: path.clone() });
let startup = FileAtomic::<IndexStartup>::open(path.join("startup"));
clean(
Expand Down Expand Up @@ -161,6 +174,7 @@ impl<O: Op> Index<O> {
Arc::new(Index {
path: path.clone(),
options: options.clone(),
flexible,
delete: delete.clone(),
protect: Mutex::new(IndexProtect {
startup,
Expand All @@ -179,6 +193,7 @@ impl<O: Op> Index<O> {
instant_write: AtomicCell::new(Instant::now()),
background_indexing: Mutex::new(None),
background_sealing: Mutex::new(None),
background_setting: Mutex::new(None),
_tracker: tracker,
})
}
Expand All @@ -188,6 +203,25 @@ impl<O: Op> Index<O> {
pub fn view(&self) -> Arc<IndexView<O>> {
self.view.load_full()
}
pub fn setting(&self, key: String, value: String) -> Result<(), SettingError> {
match key.as_str() {
"optimizing.threads" => {
let parsed = i32::from_str(value.as_str())
.map_err(|_e| SettingError::BadValue { key, value })?;
let threads = match parsed {
-1 => IndexFlexibleOptions::default()
.optimizing_threads
.load(Ordering::Relaxed),
threads_limit => threads_limit as u16,
};
self.flexible
.optimizing_threads
.store(threads, Ordering::Relaxed);
}
_ => return Err(SettingError::BadKey { key }),
};
Ok(())
}
pub fn refresh(&self) {
let mut protect = self.protect.lock();
if let Some((uuid, write)) = protect.write.clone() {
Expand Down Expand Up @@ -256,6 +290,12 @@ impl<O: Op> Index<O> {
*background_sealing = Some(OptimizerSealing::new(self.clone()).spawn());
}
}
{
let mut background_setting = self.background_setting.lock();
if background_setting.is_none() {
*background_setting = Some(FlexibleOptionSyncing::new(self.clone()).spawn());
}
}
}
pub fn stop(&self) {
{
Expand All @@ -272,6 +312,13 @@ impl<O: Op> Index<O> {
let _ = join_handle.join();
}
}
{
let mut background_setting = self.background_setting.lock();
if let Some((sender, join_handle)) = background_setting.take() {
drop(sender);
let _ = join_handle.join();
}
}
}
pub fn wait(&self) -> Arc<IndexTracker> {
Arc::clone(&self._tracker)
Expand Down
4 changes: 3 additions & 1 deletion crates/index/src/optimizing/indexing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use crossbeam::channel::TryRecvError;
use crossbeam::channel::{bounded, Receiver, RecvTimeoutError, Sender};
use std::cmp::Reverse;
use std::convert::Infallible;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::thread::JoinHandle;
use std::time::Instant;
Expand Down Expand Up @@ -102,8 +103,9 @@ impl<O: Op> OptimizerIndexing<O> {
}
fn main(self, shutdown: Receiver<Infallible>) {
let index = self.index;
let threads = index.flexible.optimizing_threads.load(Ordering::Relaxed);
rayon::ThreadPoolBuilder::new()
.num_threads(index.options.optimizing.optimizing_threads)
.num_threads(threads as usize)
.build_scoped(|pool| {
std::thread::scope(|scope| {
scope.spawn(|| match shutdown.recv() {
Expand Down
60 changes: 60 additions & 0 deletions crates/index/src/setting.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
use super::Index;
use crate::optimizing::indexing::OptimizerIndexing;
use crate::Op;
use base::index::IndexFlexibleOptions;
use crossbeam::channel::{bounded, Receiver, RecvTimeoutError, Sender};
use std::convert::Infallible;
use std::sync::Arc;
use std::thread::JoinHandle;
use std::time::Duration;

pub struct FlexibleOptionSyncing<O: Op> {
index: Arc<Index<O>>,
}

impl<O: Op> FlexibleOptionSyncing<O> {
pub fn new(index: Arc<Index<O>>) -> Self {
Self { index }
}
pub fn spawn(self) -> (Sender<Infallible>, JoinHandle<()>) {
let (tx, rx) = bounded(1);
(
tx,
std::thread::spawn(move || {
self.main(rx);
}),
)
}
pub fn main(self, shutdown: Receiver<Infallible>) {
let index = &self.index;
let mut old_options = index.flexible.clone();
let dur = Duration::from_secs(5);
loop {
let new_options = index.flexible.clone();
if new_options != old_options {
std::fs::write(
index.path.join("flexible"),
serde_json::to_string::<IndexFlexibleOptions>(&new_options).unwrap(),
)
.unwrap();
old_options = new_options.clone();
}
self.update_optimizing_threads(new_options.clone(), old_options.clone());
match shutdown.recv_timeout(dur) {
Ok(never) => match never {},
Err(RecvTimeoutError::Disconnected) => return,
Err(RecvTimeoutError::Timeout) => continue,
}
}
}
fn update_optimizing_threads(&self, new: IndexFlexibleOptions, old: IndexFlexibleOptions) {
if !new.optimizing_threads_eq(&old) {
let mut background = self.index.background_indexing.lock();
if let Some((sender, join_handle)) = background.take() {
drop(sender);
let _ = join_handle.join();
*background = Some(OptimizerIndexing::new(self.index.clone()).spawn());
}
}
}
}
20 changes: 20 additions & 0 deletions crates/service/src/instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,26 @@ impl Instance {
Instance::Veci8Dot(x) => x.stat(),
}
}
pub fn setting(&self, key: String, value: String) -> Result<(), SettingError> {
match self {
Instance::Vecf32Cos(x) => x.setting(key, value),
Instance::Vecf32Dot(x) => x.setting(key, value),
Instance::Vecf32L2(x) => x.setting(key, value),
Instance::Vecf16Cos(x) => x.setting(key, value),
Instance::Vecf16Dot(x) => x.setting(key, value),
Instance::Vecf16L2(x) => x.setting(key, value),
Instance::SVecf32Cos(x) => x.setting(key, value),
Instance::SVecf32Dot(x) => x.setting(key, value),
Instance::SVecf32L2(x) => x.setting(key, value),
Instance::BVecf32Cos(x) => x.setting(key, value),
Instance::BVecf32Dot(x) => x.setting(key, value),
Instance::BVecf32L2(x) => x.setting(key, value),
Instance::BVecf32Jaccard(x) => x.setting(key, value),
Instance::Veci8L2(x) => x.setting(key, value),
Instance::Veci8Cos(x) => x.setting(key, value),
Instance::Veci8Dot(x) => x.setting(key, value),
}
}
pub fn start(&self) {
match self {
Instance::Vecf32Cos(x) => x.start(),
Expand Down
5 changes: 5 additions & 0 deletions crates/service/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,11 @@ impl WorkerOperations for Worker {
let stat = instance.stat();
Ok(stat)
}
fn setting(&self, handle: Handle, key: String, value: String) -> Result<(), SettingError> {
let view = self.view();
let instance = view.get(handle).ok_or(SettingError::NotExist)?;
instance.setting(key, value)
}
}

pub struct WorkerView {
Expand Down
8 changes: 8 additions & 0 deletions src/bgworker/normal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,14 @@ fn session(worker: Arc<Worker>, handler: ServerRpcHandler) -> Result<Infallible,
ServerRpcHandle::Stat { handle, x } => {
handler = x.leave(worker.stat(handle))?;
}
ServerRpcHandle::Setting {
handle,
key,
value,
x,
} => {
handler = x.leave(worker.setting(handle, key, value))?;
}
ServerRpcHandle::Basic {
handle,
vector,
Expand Down
Loading

0 comments on commit cd2a473

Please sign in to comment.