Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add optimizing_threads to function #375

Merged
merged 3 commits into from
Mar 26, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 35 additions & 10 deletions crates/base/src/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,41 @@ pub enum StatError {
NotExist,
}

#[must_use]
#[derive(Debug, Clone, Error, Serialize, Deserialize)]
pub enum SettingError {
#[error("Setting key {key} is not exist.")]
BadKey { key: String },
#[error("Setting key {key} has a wrong value {value}.")]
BadValue { key: String, value: String },
#[error("Index not found.")]
NotExist,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(deny_unknown_fields)]
pub struct IndexFlexibleOptions {
#[serde(default = "IndexFlexibleOptions::default_optimizing_threads")]
pub optimizing_threads: u16,
cutecutecat marked this conversation as resolved.
Show resolved Hide resolved
}

impl IndexFlexibleOptions {
pub fn default_optimizing_threads() -> u16 {
match std::thread::available_parallelism() {
Ok(threads) => (threads.get() as f64).sqrt() as _,
Err(_) => 1,
cutecutecat marked this conversation as resolved.
Show resolved Hide resolved
}
}
}

impl Default for IndexFlexibleOptions {
fn default() -> Self {
Self {
optimizing_threads: Self::default_optimizing_threads(),
}
}
}

#[derive(Debug, Clone, Serialize, Deserialize, Validate)]
#[serde(deny_unknown_fields)]
#[validate(schema(function = "IndexOptions::validate_index_options"))]
Expand Down Expand Up @@ -198,9 +233,6 @@ pub struct OptimizingOptions {
#[serde(default = "OptimizingOptions::default_delete_threshold")]
#[validate(range(min = 0.01, max = 1.00))]
pub delete_threshold: f64,
#[serde(default = "OptimizingOptions::default_optimizing_threads")]
#[validate(range(min = 1, max = 65535))]
pub optimizing_threads: usize,
}

impl OptimizingOptions {
Expand All @@ -213,12 +245,6 @@ impl OptimizingOptions {
fn default_delete_threshold() -> f64 {
0.2
}
fn default_optimizing_threads() -> usize {
match std::thread::available_parallelism() {
Ok(threads) => (threads.get() as f64).sqrt() as _,
Err(_) => 1,
}
}
}

impl Default for OptimizingOptions {
Expand All @@ -227,7 +253,6 @@ impl Default for OptimizingOptions {
sealing_secs: Self::default_sealing_secs(),
sealing_size: Self::default_sealing_size(),
delete_threshold: Self::default_delete_threshold(),
optimizing_threads: Self::default_optimizing_threads(),
}
}
}
Expand Down
1 change: 1 addition & 0 deletions crates/base/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ pub trait WorkerOperations {
fn view_vbase(&self, handle: Handle) -> Result<impl ViewVbaseOperations, VbaseError>;
fn view_list(&self, handle: Handle) -> Result<impl ViewListOperations, ListError>;
fn stat(&self, handle: Handle) -> Result<IndexStat, StatError>;
fn setting(&self, handle: Handle, key: String, value: String) -> Result<(), SettingError>;
}

pub trait ViewBasicOperations {
Expand Down
3 changes: 3 additions & 0 deletions crates/common/src/clean.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ pub fn clean(path: impl AsRef<Path>, wanted: impl Iterator<Item = String>) {
.unwrap();
let wanted = HashSet::<String>::from_iter(wanted);
for dir in dirs {
if dir.path().is_file() {
log::info!("Unexpected file {:?}, skip.", dir.path());
}
let filename = dir.file_name();
let filename = filename.to_str().unwrap();
let p = path.as_ref().join(filename);
Expand Down
39 changes: 39 additions & 0 deletions crates/index/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ use std::collections::HashMap;
use std::collections::HashSet;
use std::convert::Infallible;
use std::path::PathBuf;
use std::str::FromStr;
use std::sync::Arc;
use std::thread::JoinHandle;
use std::time::Instant;
Expand Down Expand Up @@ -63,6 +64,7 @@ pub struct Index<O: Op> {

impl<O: Op> Index<O> {
pub fn create(path: PathBuf, options: IndexOptions) -> Result<Arc<Self>, CreateError> {
let flexible = IndexFlexibleOptions::default();
if let Err(err) = options.validate() {
return Err(CreateError::InvalidIndexOptions {
reason: err.to_string(),
Expand All @@ -80,6 +82,7 @@ impl<O: Op> Index<O> {
IndexStartup {
sealeds: HashSet::new(),
growings: HashSet::new(),
flexible,
},
);
let delete = Delete::create(path.join("delete"));
Expand Down Expand Up @@ -188,6 +191,28 @@ impl<O: Op> Index<O> {
pub fn view(&self) -> Arc<IndexView<O>> {
self.view.load_full()
}
pub fn setting(self: &Arc<Self>, key: String, value: String) -> Result<(), SettingError> {
let mut protect = self.protect.lock();
match key.as_str() {
"optimizing.threads" => {
let parsed = i32::from_str(value.as_str())
.map_err(|_e| SettingError::BadValue { key, value })?;
let optimizing_threads = match parsed {
-1 => IndexFlexibleOptions::default_optimizing_threads(),
threads_limit => threads_limit as u16,
};
cutecutecat marked this conversation as resolved.
Show resolved Hide resolved
protect.flexible_set(IndexFlexibleOptions { optimizing_threads });
let mut background = self.background_indexing.lock();
if let Some((sender, join_handle)) = background.take() {
drop(sender);
let _ = join_handle.join();
*background = Some(OptimizerIndexing::new(self.clone()).spawn());
}
cutecutecat marked this conversation as resolved.
Show resolved Hide resolved
}
_ => return Err(SettingError::BadKey { key }),
};
Ok(())
}
pub fn refresh(&self) {
let mut protect = self.protect.lock();
if let Some((uuid, write)) = protect.write.clone() {
Expand Down Expand Up @@ -509,6 +534,7 @@ impl<O: Op> IndexView<O> {
struct IndexStartup {
sealeds: HashSet<Uuid>,
growings: HashSet<Uuid>,
flexible: IndexFlexibleOptions,
}

struct IndexProtect<O: Op> {
Expand Down Expand Up @@ -538,7 +564,20 @@ impl<O: Op> IndexProtect<O> {
self.startup.set(IndexStartup {
sealeds: startup_sealeds,
growings: startup_growings,
flexible: self.flexible_get(),
});
swap.swap(view);
}
fn flexible_set(&mut self, flexible: IndexFlexibleOptions) {
let src = self.startup.get();
self.startup.set(IndexStartup {
sealeds: src.sealeds.clone(),
growings: src.sealeds.clone(),
flexible,
});
}
fn flexible_get(&self) -> IndexFlexibleOptions {
let src = self.startup.get();
src.flexible.clone()
}
cutecutecat marked this conversation as resolved.
Show resolved Hide resolved
}
6 changes: 5 additions & 1 deletion crates/index/src/optimizing/indexing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,12 @@ impl<O: Op> OptimizerIndexing<O> {
}
fn main(self, shutdown: Receiver<Infallible>) {
let index = self.index;
let threads = {
let protect = index.protect.lock();
protect.flexible_get().optimizing_threads
};
rayon::ThreadPoolBuilder::new()
.num_threads(index.options.optimizing.optimizing_threads)
.num_threads(threads as usize)
.build_scoped(|pool| {
std::thread::scope(|scope| {
scope.spawn(|| match shutdown.recv() {
Expand Down
20 changes: 20 additions & 0 deletions crates/service/src/instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,26 @@ impl Instance {
Instance::Veci8Dot(x) => x.stat(),
}
}
pub fn setting(&self, key: String, value: String) -> Result<(), SettingError> {
match self {
Instance::Vecf32Cos(x) => x.setting(key, value),
Instance::Vecf32Dot(x) => x.setting(key, value),
Instance::Vecf32L2(x) => x.setting(key, value),
Instance::Vecf16Cos(x) => x.setting(key, value),
Instance::Vecf16Dot(x) => x.setting(key, value),
Instance::Vecf16L2(x) => x.setting(key, value),
Instance::SVecf32Cos(x) => x.setting(key, value),
Instance::SVecf32Dot(x) => x.setting(key, value),
Instance::SVecf32L2(x) => x.setting(key, value),
Instance::BVecf32Cos(x) => x.setting(key, value),
Instance::BVecf32Dot(x) => x.setting(key, value),
Instance::BVecf32L2(x) => x.setting(key, value),
Instance::BVecf32Jaccard(x) => x.setting(key, value),
Instance::Veci8L2(x) => x.setting(key, value),
Instance::Veci8Cos(x) => x.setting(key, value),
Instance::Veci8Dot(x) => x.setting(key, value),
}
}
pub fn start(&self) {
match self {
Instance::Vecf32Cos(x) => x.start(),
Expand Down
5 changes: 5 additions & 0 deletions crates/service/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,11 @@ impl WorkerOperations for Worker {
let stat = instance.stat();
Ok(stat)
}
fn setting(&self, handle: Handle, key: String, value: String) -> Result<(), SettingError> {
let view = self.view();
let instance = view.get(handle).ok_or(SettingError::NotExist)?;
instance.setting(key, value)
}
}

pub struct WorkerView {
Expand Down
8 changes: 8 additions & 0 deletions src/bgworker/normal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,14 @@ fn session(worker: Arc<Worker>, handler: ServerRpcHandler) -> Result<Infallible,
ServerRpcHandle::Stat { handle, x } => {
handler = x.leave(worker.stat(handle))?;
}
ServerRpcHandle::Setting {
handle,
key,
value,
x,
} => {
handler = x.leave(worker.setting(handle, key, value))?;
}
ServerRpcHandle::Basic {
handle,
vector,
Expand Down
11 changes: 11 additions & 0 deletions src/index/views.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,17 @@ use crate::error::*;
use crate::index::utils::from_oid_to_handle;
use crate::ipc::client;
use base::index::*;
use pgrx::error;

#[pgrx::pg_extern(volatile, strict)]
fn _vectors_alter_vector_index(oid: pgrx::pg_sys::Oid, key: String, value: String) {
let id = from_oid_to_handle(oid);
let mut rpc = check_client(client());
match rpc.setting(id, key, value) {
Ok(_) => {}
Err(e) => error!("{}", e.to_string()),
}
}

#[pgrx::pg_extern(volatile, strict, parallel_safe)]
fn _vectors_index_stat(
Expand Down
1 change: 1 addition & 0 deletions src/ipc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -329,4 +329,5 @@ defines! {
stream vbase(handle: Handle, vector: OwnedVector, opts: SearchOptions) -> Pointer;
stream list(handle: Handle) -> Pointer;
unary stat(handle: Handle) -> IndexStat;
unary setting(handle: Handle, key: String, value: String) -> ();
cutecutecat marked this conversation as resolved.
Show resolved Hide resolved
}
3 changes: 3 additions & 0 deletions src/sql/finalize.sql
Original file line number Diff line number Diff line change
Expand Up @@ -590,6 +590,9 @@ BEGIN
END;
$$;

CREATE FUNCTION alter_vector_index("index" OID, "key" TEXT, "value" TEXT) RETURNS void
STRICT LANGUAGE c AS 'MODULE_PATHNAME', '_vectors_alter_vector_index_wrapper';

-- List of casts

CREATE CAST (real[] AS vector)
Expand Down
37 changes: 37 additions & 0 deletions tests/sqllogictest/index_edit.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
statement ok
SET search_path TO pg_temp, vectors;

statement ok
CREATE TABLE t (val vector(3));

statement ok
INSERT INTO t (val) SELECT ARRAY[random(), random(), random()]::real[] FROM generate_series(1, 1000);

statement ok
CREATE INDEX hnsw_1 ON t USING vectors (val vector_l2_ops)
WITH (options = "[indexing.hnsw]");

query I
SELECT COUNT(1) FROM (SELECT 1 FROM t ORDER BY val <#> '[0.5,0.5,0.5]' limit 10) t2;
----
10

statement error does not exist
SELECT alter_vector_index('unknown_index'::regclass::oid, 'optimizing.threads', '1');

statement error Setting key
SELECT alter_vector_index('hnsw_1'::regclass::oid, 'unknown_key', '1');

statement error wrong value
SELECT alter_vector_index('hnsw_1'::regclass::oid, 'optimizing.threads', 'unknown_value');

statement ok
SELECT alter_vector_index('hnsw_1'::regclass::oid, 'optimizing.threads', '1');

query I
SELECT COUNT(1) FROM (SELECT 1 FROM t ORDER BY val <#> '[0.5,0.5,0.5]' limit 10) t2;
----
10

statement ok
DROP TABLE t;
Loading