From 38932e5cc83ef6f0f9604726d4990bffe2016e1b Mon Sep 17 00:00:00 2001 From: cutecutecat Date: Wed, 21 Feb 2024 19:48:18 +0800 Subject: [PATCH] feat: add optimizing_threads to function Signed-off-by: cutecutecat --- Cargo.lock | 21 +++++++++--- Cargo.toml | 1 - crates/base/src/error.rs | 13 ++++++++ crates/base/src/index.rs | 25 +++++++++++---- crates/base/src/worker.rs | 1 + crates/service/src/index/mod.rs | 21 ++++++++++++ .../service/src/index/optimizing/indexing.rs | 20 +++++++++--- crates/service/src/instance/mod.rs | 14 ++++++++ crates/service/src/worker/mod.rs | 5 +++ src/bgworker/normal.rs | 8 +++++ src/embedding/mod.rs | 1 - src/index/views.rs | 15 +++++++++ src/ipc/mod.rs | 1 + src/sql/finalize.sql | 3 ++ tests/sqllogictest/index_edit.slt | 32 +++++++++++++++++++ 15 files changed, 164 insertions(+), 17 deletions(-) create mode 100644 tests/sqllogictest/index_edit.slt diff --git a/Cargo.lock b/Cargo.lock index d7dfe3da3..a83cb1248 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -147,16 +147,18 @@ dependencies = [ [[package]] name = "async-executor" -version = "1.8.0" +version = "1.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "17ae5ebefcc48e7452b4987947920dac9450be1110cadf34d1b8c116bdbaf97c" +checksum = "601848938f6e24595d8f63bd4c4e98ed905a3bfa832743c7b8298a73d1b0517b" dependencies = [ "async-lock 3.3.0", "async-task", + "atomic-waker", "concurrent-queue", "fastrand 2.0.1", "futures-lite 2.2.0", "slab", + "thread_local", ] [[package]] @@ -498,9 +500,9 @@ dependencies = [ [[package]] name = "bumpalo" -version = "3.15.1" +version = "3.15.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c764d619ca78fccbf3069b37bd7af92577f044bb15236036662d79b6559f25b7" +checksum = "a3b1be7772ee4501dba05acbe66bb1e8760f6a6c474a36035631638e4415f130" [[package]] name = "bytemuck" @@ -2932,6 +2934,16 @@ dependencies = [ "syn 2.0.50", ] +[[package]] +name = "thread_local" +version = "1.1.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b9ef9bad013ada3808854ceac7b46812a6465ba368859a37e2100283d2d719c" +dependencies = [ + "cfg-if", + "once_cell", +] + [[package]] name = "tiny-keccak" version = "2.0.2" @@ -3259,7 +3271,6 @@ dependencies = [ "detect", "embedding", "env_logger", - "httpmock", "interprocess_atomic_wait", "libc", "log", diff --git a/Cargo.toml b/Cargo.toml index de85e0490..f0a7f1eb8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -46,7 +46,6 @@ toml = "0.8.10" [dev-dependencies] pgrx-tests = "0.11.3" -httpmock = "0.7" [patch.crates-io] pgrx = { git = "https://github.com/tensorchord/pgrx.git", branch = "v0.11.3-patch" } diff --git a/crates/base/src/error.rs b/crates/base/src/error.rs index 91781b7f5..ffb1ffa66 100644 --- a/crates/base/src/error.rs +++ b/crates/base/src/error.rs @@ -93,3 +93,16 @@ pub enum StatError { #[error("Maintenance should be done.")] Upgrade, } + +#[must_use] +#[derive(Debug, Clone, Error, Serialize, Deserialize)] +pub enum SettingError { + #[error("Setting key {key} is not exist.")] + BadKey { key: String }, + #[error("Setting key {key} has a wrong value {value}.")] + BadValue { key: String, value: String }, + #[error("Index not found.")] + NotExist, + #[error("Maintenance should be done.")] + Upgrade, +} diff --git a/crates/base/src/index.rs b/crates/base/src/index.rs index c7a8a3825..eb12bbb2d 100644 --- a/crates/base/src/index.rs +++ b/crates/base/src/index.rs @@ -1,3 +1,6 @@ +use std::sync::atomic::AtomicUsize; +use std::sync::atomic::Ordering; + use crate::distance::*; use crate::vector::*; use serde::{Deserialize, Serialize}; @@ -88,7 +91,7 @@ impl Default for SegmentsOptions { } } -#[derive(Debug, Clone, Serialize, Deserialize, Validate)] +#[derive(Debug, Serialize, Deserialize, Validate)] #[serde(deny_unknown_fields)] pub struct OptimizingOptions { #[serde(default = "OptimizingOptions::default_sealing_secs")] @@ -101,8 +104,18 @@ pub struct OptimizingOptions { #[validate(range(min = 0.01, max = 1.00))] pub delete_threshold: f64, #[serde(default = "OptimizingOptions::default_optimizing_threads")] - #[validate(range(min = 1, max = 65535))] - pub optimizing_threads: usize, + pub optimizing_threads: AtomicUsize, +} + +impl Clone for OptimizingOptions { + fn clone(&self) -> Self { + OptimizingOptions { + sealing_secs: self.sealing_secs, + sealing_size: self.sealing_size, + delete_threshold: self.delete_threshold, + optimizing_threads: AtomicUsize::new(self.optimizing_threads.load(Ordering::Relaxed)), + } + } } impl OptimizingOptions { @@ -115,10 +128,10 @@ impl OptimizingOptions { fn default_delete_threshold() -> f64 { 0.2 } - fn default_optimizing_threads() -> usize { + fn default_optimizing_threads() -> AtomicUsize { match std::thread::available_parallelism() { - Ok(threads) => (threads.get() as f64).sqrt() as _, - Err(_) => 1, + Ok(threads) => AtomicUsize::new((threads.get() as f64).sqrt() as _), + Err(_) => AtomicUsize::new(1), } } } diff --git a/crates/base/src/worker.rs b/crates/base/src/worker.rs index 124df5804..6648ec20d 100644 --- a/crates/base/src/worker.rs +++ b/crates/base/src/worker.rs @@ -18,6 +18,7 @@ pub trait WorkerOperations { fn view_vbase(&self, handle: Handle) -> Result; fn view_list(&self, handle: Handle) -> Result; fn stat(&self, handle: Handle) -> Result; + fn setting(&self, handle: Handle, key: String, value: String) -> Result<(), SettingError>; } pub trait ViewBasicOperations { diff --git a/crates/service/src/index/mod.rs b/crates/service/src/index/mod.rs index 69384d119..a42d326f9 100644 --- a/crates/service/src/index/mod.rs +++ b/crates/service/src/index/mod.rs @@ -21,6 +21,8 @@ use std::cmp::Reverse; use std::collections::HashMap; use std::collections::HashSet; use std::path::PathBuf; +use std::str::FromStr; +use std::sync::atomic::Ordering; use std::sync::Arc; use std::time::Instant; use thiserror::Error; @@ -170,6 +172,25 @@ impl Index { pub fn view(&self) -> Arc> { self.view.load_full() } + pub fn setting(&self, key: String, value: String) -> Result<(), SettingError> { + match key.as_str() { + "optimizing.threads" => { + let parsed = usize::from_str(value.as_str()) + .map_err(|_e| SettingError::BadValue { key, value })?; + self.options + .optimizing + .optimizing_threads + .store(parsed, Ordering::Relaxed); + } + _ => return Err(SettingError::BadKey { key }), + }; + std::fs::write( + self.path.join("options"), + serde_json::to_string::(&self.options).unwrap(), + ) + .unwrap(); + Ok(()) + } pub fn refresh(&self) { let mut protect = self.protect.lock(); if let Some((uuid, write)) = protect.write.clone() { diff --git a/crates/service/src/index/optimizing/indexing.rs b/crates/service/src/index/optimizing/indexing.rs index 6aa9c11cf..91acd6f41 100644 --- a/crates/service/src/index/optimizing/indexing.rs +++ b/crates/service/src/index/optimizing/indexing.rs @@ -3,6 +3,7 @@ use crate::index::Index; use crate::index::SealedSegment; use crate::prelude::*; use std::cmp::Reverse; +use std::sync::atomic::Ordering; use std::sync::Arc; use std::time::Instant; use thiserror::Error; @@ -23,10 +24,6 @@ impl OptimizerIndexing { } pub fn main(self) { let index = self.index; - let pool = rayon::ThreadPoolBuilder::new() - .num_threads(index.options.optimizing.optimizing_threads) - .build() - .unwrap(); let weak_index = Arc::downgrade(&index); drop(index); loop { @@ -34,6 +31,21 @@ impl OptimizerIndexing { let Some(index) = weak_index.upgrade() else { return; }; + let threads = match index + .options + .optimizing + .optimizing_threads + .load(Ordering::Relaxed) + { + 0 => OptimizingOptions::default() + .optimizing_threads + .load(Ordering::Relaxed), + threads_limit => threads_limit, + }; + let pool = rayon::ThreadPoolBuilder::new() + .num_threads(threads) + .build() + .unwrap(); if let Ok(()) = pool.install(|| optimizing_indexing(index.clone())) { continue; } diff --git a/crates/service/src/instance/mod.rs b/crates/service/src/instance/mod.rs index da5b6f2c6..fdd3912ac 100644 --- a/crates/service/src/instance/mod.rs +++ b/crates/service/src/instance/mod.rs @@ -133,6 +133,20 @@ impl Instance { Instance::Upgrade => None, } } + pub fn setting(&self, key: String, value: String) -> Result<(), SettingError> { + match self { + Instance::Vecf32Cos(x) => x.setting(key, value), + Instance::Vecf32Dot(x) => x.setting(key, value), + Instance::Vecf32L2(x) => x.setting(key, value), + Instance::Vecf16Cos(x) => x.setting(key, value), + Instance::Vecf16Dot(x) => x.setting(key, value), + Instance::Vecf16L2(x) => x.setting(key, value), + Instance::SVecf32L2(x) => x.setting(key, value), + Instance::SVecf32Cos(x) => x.setting(key, value), + Instance::SVecf32Dot(x) => x.setting(key, value), + Instance::Upgrade => Err(SettingError::Upgrade), + } + } } pub enum InstanceView { diff --git a/crates/service/src/worker/mod.rs b/crates/service/src/worker/mod.rs index 104b66740..0de49b8c6 100644 --- a/crates/service/src/worker/mod.rs +++ b/crates/service/src/worker/mod.rs @@ -143,6 +143,11 @@ impl WorkerOperations for Worker { let stat = instance.stat().ok_or(StatError::Upgrade)?; Ok(stat) } + fn setting(&self, handle: Handle, key: String, value: String) -> Result<(), SettingError> { + let view = self.view(); + let instance = view.get(handle).ok_or(SettingError::NotExist)?; + instance.setting(key, value) + } } pub struct WorkerView { diff --git a/src/bgworker/normal.rs b/src/bgworker/normal.rs index 7979f9449..6d51c460c 100644 --- a/src/bgworker/normal.rs +++ b/src/bgworker/normal.rs @@ -86,6 +86,14 @@ fn session(worker: Arc, handler: ServerRpcHandler) -> Result { handler = x.leave(worker.delete(handle, pointer))?; } + ServerRpcHandle::Setting { + handle, + key, + value, + x, + } => { + handler = x.leave(worker.setting(handle, key, value))?; + } ServerRpcHandle::Stat { handle, x } => { handler = x.leave(worker.stat(handle))?; } diff --git a/src/embedding/mod.rs b/src/embedding/mod.rs index 7ea35378b..3785bda6d 100644 --- a/src/embedding/mod.rs +++ b/src/embedding/mod.rs @@ -15,6 +15,5 @@ fn _vectors_text2vec_openai(input: String, model: String) -> Vecf32Output { Ok(emb) => emb.into_iter().map(F32).collect::>(), Err(e) => error!("{}", e.to_string()), }; - Vecf32Output::new(Vecf32Borrowed::new(&embedding)) } diff --git a/src/index/views.rs b/src/index/views.rs index 6163a538d..bb11e7793 100644 --- a/src/index/views.rs +++ b/src/index/views.rs @@ -1,4 +1,19 @@ use crate::prelude::*; +use pgrx::error; +use pgrx::pg_sys::{AsPgCStr, RelnameGetRelid}; + +#[pgrx::pg_extern(volatile, strict)] +fn _vectors_index_setting(index: String, key: String, value: String) { + unsafe { + let oid = RelnameGetRelid(index.as_pg_cstr()); + let id = Handle::from_sys(oid); + let mut rpc = check_client(crate::ipc::client()); + match rpc.setting(id, key, value) { + Ok(_) => {} + Err(e) => error!("{}", e.to_string()), + } + } +} #[pgrx::pg_extern(volatile, strict)] fn _vectors_index_stat( diff --git a/src/ipc/mod.rs b/src/ipc/mod.rs index 3ce17a076..2a2ee1b80 100644 --- a/src/ipc/mod.rs +++ b/src/ipc/mod.rs @@ -325,4 +325,5 @@ defines! { stream vbase(handle: Handle, vector: OwnedVector, opts: SearchOptions) -> Pointer; stream list(handle: Handle) -> Pointer; unary stat(handle: Handle) -> IndexStat; + unary setting(handle: Handle, key: String, value: String) -> (); } diff --git a/src/sql/finalize.sql b/src/sql/finalize.sql index a29560753..92ea2a9a4 100644 --- a/src/sql/finalize.sql +++ b/src/sql/finalize.sql @@ -360,6 +360,9 @@ BEGIN END; $$; +CREATE FUNCTION alter_vector_index(input TEXT, "key" TEXT, "value" TEXT) RETURNS void +STRICT LANGUAGE c AS 'MODULE_PATHNAME', '_vectors_index_setting_wrapper'; + -- List of casts CREATE CAST (real[] AS vector) diff --git a/tests/sqllogictest/index_edit.slt b/tests/sqllogictest/index_edit.slt new file mode 100644 index 000000000..266b6cc71 --- /dev/null +++ b/tests/sqllogictest/index_edit.slt @@ -0,0 +1,32 @@ +statement ok +SET search_path TO pg_temp, vectors; + +statement ok +CREATE TABLE t (val vector(3)); + +statement ok +INSERT INTO t (val) SELECT ARRAY[random(), random(), random()]::real[] FROM generate_series(1, 1000); + +statement ok +CREATE INDEX hnsw_1 ON t USING vectors (val vector_l2_ops) +WITH (options = "[indexing.hnsw]"); + +query I +SELECT COUNT(1) FROM (SELECT 1 FROM t ORDER BY val <#> '[0.5,0.5,0.5]' limit 10) t2; +---- +10 + +statement error Index not found. +SELECT alter_vector_index('unknown_index', 'optimizing.threads', '1'); + +statement error Setting key +SELECT alter_vector_index('hnsw_1', 'unknown_key', '1'); + +statement error wrong value +SELECT alter_vector_index('hnsw_1', 'optimizing.threads', 'unknown_value'); + +statement ok +SELECT alter_vector_index('hnsw_1', 'optimizing.threads', '1'); + +statement ok +DROP TABLE t; \ No newline at end of file