Skip to content

Commit

Permalink
fix by comments
Browse files Browse the repository at this point in the history
Signed-off-by: cutecutecat <junyuchen@tensorchord.ai>
  • Loading branch information
cutecutecat committed Mar 26, 2024
1 parent 64170ff commit 3b3fd0e
Show file tree
Hide file tree
Showing 9 changed files with 64 additions and 79 deletions.
10 changes: 4 additions & 6 deletions crates/base/src/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ pub enum StatError {

#[must_use]
#[derive(Debug, Clone, Error, Serialize, Deserialize)]
pub enum SettingError {
pub enum AlterError {
#[error("Setting key {key} is not exist.")]
BadKey { key: String },
#[error("Setting key {key} has a wrong value {value}.")]
Expand All @@ -89,19 +89,17 @@ pub enum SettingError {
NotExist,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
#[derive(Debug, Clone, Serialize, Deserialize, Validate)]
#[serde(deny_unknown_fields)]
pub struct IndexFlexibleOptions {
#[serde(default = "IndexFlexibleOptions::default_optimizing_threads")]
#[validate(range(min = 1, max = 65535))]
pub optimizing_threads: u16,
}

impl IndexFlexibleOptions {
pub fn default_optimizing_threads() -> u16 {
match std::thread::available_parallelism() {
Ok(threads) => (threads.get() as f64).sqrt() as _,
Err(_) => 1,
}
1
}
}

Expand Down
2 changes: 1 addition & 1 deletion crates/base/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ pub trait WorkerOperations {
fn view_vbase(&self, handle: Handle) -> Result<impl ViewVbaseOperations, VbaseError>;
fn view_list(&self, handle: Handle) -> Result<impl ViewListOperations, ListError>;
fn stat(&self, handle: Handle) -> Result<IndexStat, StatError>;
fn setting(&self, handle: Handle, key: String, value: String) -> Result<(), SettingError>;
fn alter(&self, handle: Handle, key: String, value: String) -> Result<(), AlterError>;
}

pub trait ViewBasicOperations {
Expand Down
29 changes: 13 additions & 16 deletions crates/index/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@ pub struct Index<O: Op> {

impl<O: Op> Index<O> {
pub fn create(path: PathBuf, options: IndexOptions) -> Result<Arc<Self>, CreateError> {
let flexible = IndexFlexibleOptions::default();
if let Err(err) = options.validate() {
return Err(CreateError::InvalidIndexOptions {
reason: err.to_string(),
Expand All @@ -82,7 +81,7 @@ impl<O: Op> Index<O> {
IndexStartup {
sealeds: HashSet::new(),
growings: HashSet::new(),
flexible,
flexible: IndexFlexibleOptions::default(),
},
);
let delete = Delete::create(path.join("delete"));
Expand All @@ -99,6 +98,7 @@ impl<O: Op> Index<O> {
}),
view: ArcSwap::new(Arc::new(IndexView {
options: options.clone(),
flexible: IndexFlexibleOptions::default(),
sealed: HashMap::new(),
growing: HashMap::new(),
delete: delete.clone(),
Expand All @@ -119,6 +119,7 @@ impl<O: Op> Index<O> {
.unwrap();
let tracker = Arc::new(IndexTracker { path: path.clone() });
let startup = FileAtomic::<IndexStartup>::open(path.join("startup"));
let flexible = startup.get().flexible.clone();
clean(
path.join("segments"),
startup
Expand Down Expand Up @@ -173,6 +174,7 @@ impl<O: Op> Index<O> {
}),
view: ArcSwap::new(Arc::new(IndexView {
options: options.clone(),
flexible,
delete: delete.clone(),
sealed,
growing,
Expand All @@ -191,25 +193,20 @@ impl<O: Op> Index<O> {
pub fn view(&self) -> Arc<IndexView<O>> {
self.view.load_full()
}
pub fn setting(self: &Arc<Self>, key: String, value: String) -> Result<(), SettingError> {
pub fn alter(self: &Arc<Self>, key: String, value: String) -> Result<(), AlterError> {
let mut protect = self.protect.lock();
match key.as_str() {
"optimizing.threads" => {
let parsed = i32::from_str(value.as_str())
.map_err(|_e| SettingError::BadValue { key, value })?;
.map_err(|_e| AlterError::BadValue { key, value })?;
let optimizing_threads = match parsed {
-1 => IndexFlexibleOptions::default_optimizing_threads(),
threads_limit => threads_limit as u16,
};
protect.flexible_set(IndexFlexibleOptions { optimizing_threads });
let mut background = self.background_indexing.lock();
if let Some((sender, join_handle)) = background.take() {
drop(sender);
let _ = join_handle.join();
*background = Some(OptimizerIndexing::new(self.clone()).spawn());
}
protect.maintain(self.options.clone(), self.delete.clone(), &self.view);
}
_ => return Err(SettingError::BadKey { key }),
_ => return Err(AlterError::BadKey { key }),
};
Ok(())
}
Expand Down Expand Up @@ -320,6 +317,7 @@ impl Drop for IndexTracker {

pub struct IndexView<O: Op> {
pub options: IndexOptions,
pub flexible: IndexFlexibleOptions,
pub delete: Arc<Delete>,
pub sealed: HashMap<Uuid, Arc<SealedSegment<O>>>,
pub growing: HashMap<Uuid, Arc<GrowingSegment<O>>>,
Expand Down Expand Up @@ -545,14 +543,17 @@ struct IndexProtect<O: Op> {
}

impl<O: Op> IndexProtect<O> {
/// Export IndexProtect to IndexView
fn maintain(
&mut self,
options: IndexOptions,
delete: Arc<Delete>,
swap: &ArcSwap<IndexView<O>>,
) {
let old_startup = self.startup.get();
let view = Arc::new(IndexView {
options,
flexible: old_startup.flexible.clone(),
delete,
sealed: self.sealed.clone(),
growing: self.growing.clone(),
Expand All @@ -564,7 +565,7 @@ impl<O: Op> IndexProtect<O> {
self.startup.set(IndexStartup {
sealeds: startup_sealeds,
growings: startup_growings,
flexible: self.flexible_get(),
flexible: old_startup.flexible.clone(),
});
swap.swap(view);
}
Expand All @@ -576,8 +577,4 @@ impl<O: Op> IndexProtect<O> {
flexible,
});
}
fn flexible_get(&self) -> IndexFlexibleOptions {
let src = self.startup.get();
src.flexible.clone()
}
}
54 changes: 22 additions & 32 deletions crates/index/src/optimizing/indexing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ use base::operator::Borrowed;
pub use base::search::*;
pub use base::vector::*;
use crossbeam::channel::RecvError;
use crossbeam::channel::TryRecvError;
use crossbeam::channel::{bounded, Receiver, RecvTimeoutError, Sender};
use std::cmp::Reverse;
use std::convert::Infallible;
Expand Down Expand Up @@ -102,38 +101,29 @@ impl<O: Op> OptimizerIndexing<O> {
}
fn main(self, shutdown: Receiver<Infallible>) {
let index = self.index;
let threads = {
let protect = index.protect.lock();
protect.flexible_get().optimizing_threads
};
rayon::ThreadPoolBuilder::new()
.num_threads(threads as usize)
.build_scoped(|pool| {
std::thread::scope(|scope| {
scope.spawn(|| match shutdown.recv() {
Ok(never) => match never {},
Err(RecvError) => {
pool.stop();
}
});
loop {
if let Ok(()) = pool.install(|| optimizing_indexing(index.clone())) {
match shutdown.try_recv() {
Ok(never) => match never {},
Err(TryRecvError::Disconnected) => return,
Err(TryRecvError::Empty) => (),
}
continue;
}
match shutdown.recv_timeout(std::time::Duration::from_secs(60)) {
loop {
let view = index.view();
let threads = view.flexible.optimizing_threads;
rayon::ThreadPoolBuilder::new()
.num_threads(threads as usize)
.build_scoped(|pool| {
std::thread::scope(|scope| {
scope.spawn(|| match shutdown.recv() {
Ok(never) => match never {},
Err(RecvTimeoutError::Disconnected) => return,
Err(RecvTimeoutError::Timeout) => (),
}
}
});
})
.unwrap();
Err(RecvError) => {
pool.stop();
}
});
let _ = pool.install(|| optimizing_indexing(index.clone()));
})
})
.unwrap();
match shutdown.recv_timeout(std::time::Duration::from_secs(60)) {
Ok(never) => match never {},
Err(RecvTimeoutError::Disconnected) => return,
Err(RecvTimeoutError::Timeout) => (),
}
}
}
}

Expand Down
34 changes: 17 additions & 17 deletions crates/service/src/instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -187,24 +187,24 @@ impl Instance {
Instance::Veci8Dot(x) => x.stat(),
}
}
pub fn setting(&self, key: String, value: String) -> Result<(), SettingError> {
pub fn alter(&self, key: String, value: String) -> Result<(), AlterError> {
match self {
Instance::Vecf32Cos(x) => x.setting(key, value),
Instance::Vecf32Dot(x) => x.setting(key, value),
Instance::Vecf32L2(x) => x.setting(key, value),
Instance::Vecf16Cos(x) => x.setting(key, value),
Instance::Vecf16Dot(x) => x.setting(key, value),
Instance::Vecf16L2(x) => x.setting(key, value),
Instance::SVecf32Cos(x) => x.setting(key, value),
Instance::SVecf32Dot(x) => x.setting(key, value),
Instance::SVecf32L2(x) => x.setting(key, value),
Instance::BVecf32Cos(x) => x.setting(key, value),
Instance::BVecf32Dot(x) => x.setting(key, value),
Instance::BVecf32L2(x) => x.setting(key, value),
Instance::BVecf32Jaccard(x) => x.setting(key, value),
Instance::Veci8L2(x) => x.setting(key, value),
Instance::Veci8Cos(x) => x.setting(key, value),
Instance::Veci8Dot(x) => x.setting(key, value),
Instance::Vecf32Cos(x) => x.alter(key, value),
Instance::Vecf32Dot(x) => x.alter(key, value),
Instance::Vecf32L2(x) => x.alter(key, value),
Instance::Vecf16Cos(x) => x.alter(key, value),
Instance::Vecf16Dot(x) => x.alter(key, value),
Instance::Vecf16L2(x) => x.alter(key, value),
Instance::SVecf32Cos(x) => x.alter(key, value),
Instance::SVecf32Dot(x) => x.alter(key, value),
Instance::SVecf32L2(x) => x.alter(key, value),
Instance::BVecf32Cos(x) => x.alter(key, value),
Instance::BVecf32Dot(x) => x.alter(key, value),
Instance::BVecf32L2(x) => x.alter(key, value),
Instance::BVecf32Jaccard(x) => x.alter(key, value),
Instance::Veci8L2(x) => x.alter(key, value),
Instance::Veci8Cos(x) => x.alter(key, value),
Instance::Veci8Dot(x) => x.alter(key, value),
}
}
pub fn start(&self) {
Expand Down
6 changes: 3 additions & 3 deletions crates/service/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,10 +176,10 @@ impl WorkerOperations for Worker {
let stat = instance.stat();
Ok(stat)
}
fn setting(&self, handle: Handle, key: String, value: String) -> Result<(), SettingError> {
fn alter(&self, handle: Handle, key: String, value: String) -> Result<(), AlterError> {
let view = self.view();
let instance = view.get(handle).ok_or(SettingError::NotExist)?;
instance.setting(key, value)
let instance = view.get(handle).ok_or(AlterError::NotExist)?;
instance.alter(key, value)
}
}

Expand Down
4 changes: 2 additions & 2 deletions src/bgworker/normal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,13 +89,13 @@ fn session(worker: Arc<Worker>, handler: ServerRpcHandler) -> Result<Infallible,
ServerRpcHandle::Stat { handle, x } => {
handler = x.leave(worker.stat(handle))?;
}
ServerRpcHandle::Setting {
ServerRpcHandle::Alter {
handle,
key,
value,
x,
} => {
handler = x.leave(worker.setting(handle, key, value))?;
handler = x.leave(worker.alter(handle, key, value))?;
}
ServerRpcHandle::Basic {
handle,
Expand Down
2 changes: 1 addition & 1 deletion src/index/views.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use pgrx::error;
fn _vectors_alter_vector_index(oid: pgrx::pg_sys::Oid, key: String, value: String) {
let id = from_oid_to_handle(oid);
let mut rpc = check_client(client());
match rpc.setting(id, key, value) {
match rpc.alter(id, key, value) {
Ok(_) => {}
Err(e) => error!("{}", e.to_string()),
}
Expand Down
2 changes: 1 addition & 1 deletion src/ipc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -329,5 +329,5 @@ defines! {
stream vbase(handle: Handle, vector: OwnedVector, opts: SearchOptions) -> Pointer;
stream list(handle: Handle) -> Pointer;
unary stat(handle: Handle) -> IndexStat;
unary setting(handle: Handle, key: String, value: String) -> ();
unary alter(handle: Handle, key: String, value: String) -> ();
}

0 comments on commit 3b3fd0e

Please sign in to comment.