diff --git a/.gitmodules b/.gitmodules new file mode 100644 index 0000000000..634eb0b5ff --- /dev/null +++ b/.gitmodules @@ -0,0 +1,3 @@ +[submodule "faiss-rs"] + path = faiss-rs + url = git@github.com:Enet4/faiss-rs.git diff --git a/Cargo.lock b/Cargo.lock index d7b03494e1..b43a09a63f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -630,6 +630,15 @@ version = "0.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "98cc8fbded0c607b7ba9dd60cd98df59af97e84d24e49c8557331cfc26d301ce" +[[package]] +name = "cmake" +version = "0.1.50" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a31c789563b815f77f4250caee12365734369f942439b7defd71e18a48197130" +dependencies = [ + "cc", +] + [[package]] name = "colorchoice" version = "1.0.0" @@ -687,6 +696,56 @@ version = "0.8.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "06ea2b9bc92be3c2baa9334a323ebca2d6f074ff852cd1d7b11064035cd3868f" +[[package]] +name = "cpp" +version = "0.5.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bfa65869ef853e45c60e9828aa08cdd1398cb6e13f3911d9cb2a079b144fcd64" +dependencies = [ + "cpp_macros", +] + +[[package]] +name = "cpp_build" +version = "0.5.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0e361fae2caf9758164b24da3eedd7f7d7451be30d90d8e7b5d2be29a2f0cf5b" +dependencies = [ + "cc", + "cpp_common", + "lazy_static", + "proc-macro2", + "regex", + "syn 2.0.50", + "unicode-xid", +] + +[[package]] +name = "cpp_common" +version = "0.5.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3e1a2532e4ed4ea13031c13bc7bc0dbca4aae32df48e9d77f0d1e743179f2ea1" +dependencies = [ + "lazy_static", + "proc-macro2", + "syn 2.0.50", +] + +[[package]] +name = "cpp_macros" +version = "0.5.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "47ec9cc90633446f779ef481a9ce5a0077107dd5b87016440448d908625a83fd" +dependencies = [ + "aho-corasick", + "byteorder", + "cpp_common", + "lazy_static", + "proc-macro2", + "quote", + "syn 2.0.50", +] + [[package]] name = "cpufeatures" version = "0.2.12" @@ -716,6 +775,7 @@ dependencies = [ "ciborium", "clap", "criterion-plot", + "futures", "is-terminal", "itertools 0.10.5", "num-traits", @@ -728,6 +788,7 @@ dependencies = [ "serde_derive", "serde_json", "tinytemplate", + "tokio", "walkdir", ] @@ -979,6 +1040,24 @@ dependencies = [ "subtle", ] +[[package]] +name = "disk-faiss" +version = "0.7.0" +dependencies = [ + "cpp", + "cpp_build", +] + +[[package]] +name = "disk-test" +version = "0.7.0" +dependencies = [ + "criterion", + "disk-faiss", + "faiss", + "rand 0.8.5", +] + [[package]] name = "display-error-chain" version = "0.2.0" @@ -1128,6 +1207,20 @@ dependencies = [ "serde", ] +[[package]] +name = "faiss" +version = "0.12.2-alpha.0" +dependencies = [ + "faiss-sys", +] + +[[package]] +name = "faiss-sys" +version = "0.6.3-alpha.0" +dependencies = [ + "cmake", +] + [[package]] name = "fast_chemail" version = "0.9.6" @@ -2845,6 +2938,7 @@ dependencies = [ "display-error-chain", "dotenv", "enum_dispatch", + "faiss", "flate2", "futures-util", "genawaiter", @@ -2889,10 +2983,12 @@ name = "raphtory-benchmark" version = "0.7.0" dependencies = [ "criterion", + "faiss", "rand 0.8.5", "raphtory", "rayon", "sorted_vector_map", + "tokio", ] [[package]] @@ -4273,6 +4369,12 @@ dependencies = [ "tinyvec", ] +[[package]] +name = "unicode-xid" +version = "0.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f962df74c8c05a667b5ee8bcf162993134c104e96440b663c8daa176dc772d8c" + [[package]] name = "unindent" version = "0.2.3" diff --git a/Cargo.toml b/Cargo.toml index 3e6df1be68..7788a9877e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,13 +1,17 @@ [workspace] members = [ - "raphtory", - "raphtory-benchmark", - "examples/rust", - "examples/netflow", - "python", - "js-raphtory", - "raphtory-graphql", - "comparison-benchmark/rust/raphtory-rust-benchmark" + "raphtory", + "raphtory-benchmark", + "examples/rust", + "examples/netflow", + "python", + "js-raphtory", + "raphtory-graphql", + "comparison-benchmark/rust/raphtory-rust-benchmark", + "faiss-rs", + "faiss-rs/faiss-sys", + "disk-faiss", + "disk-test", ] default-members = ["raphtory"] @@ -25,4 +29,4 @@ edition = "2021" [profile.release-with-debug] inherits = "release" -debug = true \ No newline at end of file +debug = true diff --git a/disk-faiss/Cargo.toml b/disk-faiss/Cargo.toml new file mode 100644 index 0000000000..feb5b0a777 --- /dev/null +++ b/disk-faiss/Cargo.toml @@ -0,0 +1,21 @@ +[package] +name = "disk-faiss" +# description = "Raphtory GraphQL server" +edition.workspace = true +rust-version.workspace = true +version.workspace = true +keywords.workspace = true +authors.workspace = true +documentation.workspace = true +repository.workspace = true +license.workspace = true +readme.workspace = true +homepage.workspace = true +# links = "faiss_c" +build = "build.rs" + +[dependencies] +cpp = "0.5.4" + +[build-dependencies] +cpp_build = "0.5.4" diff --git a/disk-faiss/build.rs b/disk-faiss/build.rs new file mode 100644 index 0000000000..205bc9a34b --- /dev/null +++ b/disk-faiss/build.rs @@ -0,0 +1,42 @@ +use std::path::PathBuf; + +extern crate cpp_build; + +fn main() { + if let Ok(paths) = std::env::var("LD_LIBRARY_PATH") { + for path in paths.split(":") { + if path != "" { + println!("cargo:rustc-link-search={}", path); + } + } + }; + + println!("cargo:rustc-link-search=/usr/local/lib"); + println!("cargo:rustc-link-search=/usr/lib"); + + if get_os_type() == "macos" { + println!("cargo:rustc-link-lib=omp"); + println!("cargo:rustc-link-lib=faiss"); + } else { + println!("cargo:rustc-link-lib=static=faiss"); + println!("cargo:rustc-link-lib=gomp"); + println!("cargo:rustc-link-lib=blas"); + println!("cargo:rustc-link-lib=lapack"); + } + + cpp_build::Config::new() + .include(PathBuf::from( + "/Users/pedrorico/pometry/raphtory/faiss-rs/faiss-sys/faiss", + )) + .build("src/lib.rs"); +} + +fn get_os_type() -> &'static str { + if cfg!(target_os = "linux") { + return "linux"; + } else if cfg!(target_os = "macos") { + return "macos"; + } else { + panic!("unknow os type"); + } +} diff --git a/disk-faiss/src/lib.rs b/disk-faiss/src/lib.rs new file mode 100644 index 0000000000..11f8f5182b --- /dev/null +++ b/disk-faiss/src/lib.rs @@ -0,0 +1,96 @@ +#![recursion_limit = "512"] +// #![cfg_attr(not(test), allow(dead_code, unused_imports))] +// #![allow(unused)] + +// #[macro_use] +// extern crate cpp; + +use cpp::cpp; + +cpp! {{ + #include + #include + #include + #include + #include + #include +}} + +pub fn merge_ondisk(index: &str, shards: Vec<&str>, ivfdata: &str, output: &str) { + let index_path = std::ffi::CString::new(index).unwrap(); + let index_path = index.as_ptr(); + + let shards: Vec<_> = shards + .iter() + .map(|shard| std::ffi::CString::new(*shard).unwrap()) + .collect(); + let shards: Vec<_> = shards.iter().map(|shard| shard.as_ptr()).collect(); + let shards = &shards; + let num_shards: u32 = shards.len() as u32; + + let ivfdata = std::ffi::CString::new(ivfdata).unwrap(); + let ivfdata = ivfdata.as_ptr(); + + let output = std::ffi::CString::new(output).unwrap(); + let output = output.as_ptr(); + + unsafe { + cpp!([index_path as "const char *", shards as "std::vector *", num_shards as "uint32_t", ivfdata as "const char *", output as "const char *"] { + try { + std::vector ivfs; + std::cout << "reading shards -> " << shards->size() << std::endl; + size_t ntotal = 0; + for (unsigned int i = 0; i < num_shards; ++i) { + const char * shard = shards->at(i); + auto index = faiss::read_index(shard, faiss::IO_FLAG_MMAP); + auto ivf = dynamic_cast(index); + assert(ivf); + + ivfs.push_back(ivf->invlists); + ntotal += ivf->ntotal; + + // ivf->own_invlists = false; + // delete ivf; + } + + auto index_raw = faiss::read_index(index_path); + auto index = dynamic_cast(index_raw); + assert(index); + + if (index->ntotal != 0) { + std::exit(1); + } + + auto il = new faiss::OnDiskInvertedLists(index->nlist, index->code_size, ivfdata); + il->merge_from(ivfs.data(), ivfs.size()); + + index->replace_invlists(il, true); + index->ntotal = ntotal; + + // auto invlists = new faiss::OnDiskInvertedLists( + // index->nlist, index->code_size, ivfdata + // ); + // std::cout << "----here----" << std::endl; + + // const faiss::InvertedLists **ivfs_data = (const faiss::InvertedLists**) ivfs.data(); + // std::cout << "---- about to merge lists with size ---- " << ivfs.size() << std::endl; + // auto ntotal = invlists->merge_from(ivfs_data, ivfs.size()); // TODO: this has a verbose parameter I can use + // std::cout << "----here----" << std::endl; + + // index->ntotal = ntotal; + // index->replace_invlists(invlists, true); + // invlists.this.disown(); ???????????????????????? + + faiss::write_index(index, output); + + } catch (const std::exception &e) { + std::cerr << "standard exception!" << std::endl; + std::cerr << e.what() << std::endl; + throw e; + } catch (...) { + std::cerr << "unknown exception!" << std::endl; + throw; + } + }) + }; +} diff --git a/disk-test/Cargo.toml b/disk-test/Cargo.toml new file mode 100644 index 0000000000..10ad0d796e --- /dev/null +++ b/disk-test/Cargo.toml @@ -0,0 +1,24 @@ +[package] +name = "disk-test" +edition.workspace = true +rust-version.workspace = true +version.workspace = true +keywords.workspace = true +authors.workspace = true +documentation.workspace = true +repository.workspace = true +license.workspace = true +readme.workspace = true +homepage.workspace = true + +[dependencies] +disk-faiss = { path = "../disk-faiss" } +faiss = { path = "../faiss-rs" } + +[dev-dependencies] +criterion = "0.5.1" +rand = "0.8.5" + +[[bench]] +name = "bench" +harness = false diff --git a/disk-test/benches/bench.rs b/disk-test/benches/bench.rs new file mode 100644 index 0000000000..f1520b894d --- /dev/null +++ b/disk-test/benches/bench.rs @@ -0,0 +1,214 @@ +use criterion::{criterion_group, criterion_main, BatchSize, Criterion}; +use disk_faiss::merge_ondisk; +use disk_test::{FVecs, FVecsReader}; +use faiss::{index_factory, read_index, write_index, Idx, Index, MetricType}; +use rand::{self, RngCore}; +use std::fs; + +struct SerialGenerator { + current: usize, +} + +impl SerialGenerator { + fn new() -> Self { + Self { current: 0 } + } +} + +impl Iterator for SerialGenerator { + type Item = usize; + fn next(&mut self) -> Option { + self.current += 1; + Some(self.current) + } +} + +// fn prepare_index_simple(index_type: &str, learn: &str, base: &str, dataset_id: &str) -> String { +// let base_path = format!("/tmp/faiss-disk-test/{dataset_id}/"); +// fs::create_dir_all(base_path.clone()).unwrap(); +// let tmpfile = |filename: &str| base_path.clone() + filename.as_ref(); + +// println!("Training index"); +// let FVecs { +// dimensions, +// vectors, +// } = FVecs::from_file(learn).unwrap(); +// let mut index = index_factory(dimensions as u32, index_type, MetricType::InnerProduct).unwrap(); +// index.train(vectors.as_slice()).unwrap(); + +// let index = index.into_ivf_flat().unwrap(); +// write_index(&index, tmpfile("trained.index")).unwrap(); + +// let full_dataset = FVecs::from_file(base).unwrap(); +// let vectors = &full_dataset.vectors; +// let num_vectors = vectors.len() / dimensions as usize; +// println!("Splitting {num_vectors} vectors into 4 files"); + +// let num_chunks = 4; +// let vectors_per_chunk = num_vectors / num_chunks + 1; +// let chunk_files: Vec<_> = (0..num_chunks) +// .map(|chunk_number| tmpfile(format!("block_{chunk_number}.index").as_str())) +// .collect(); +// let chunk_files: Vec<_> = chunk_files.iter().map(|f| f.as_str()).collect(); + +// for ((chunk_number, chunk), filename) in vectors +// .chunks(vectors_per_chunk * dimensions as usize) +// .enumerate() +// .zip(chunk_files.iter()) +// { +// let first_id = vectors_per_chunk * chunk_number; +// let ids_range = first_id..(first_id + chunk.len()); +// let ids: Vec<_> = ids_range.map(|id| Idx::from(id as i64)).collect(); +// let mut index = read_index(tmpfile("trained.index")).unwrap(); +// index.add_with_ids(chunk, ids.as_slice()).unwrap(); +// write_index(&index, filename).unwrap(); +// } + +// println!("merging indexes on disk"); +// merge_ondisk( +// &tmpfile("trained.index"), +// chunk_files, +// &tmpfile("merged_index.ivfdata"), +// &tmpfile("populated.index"), +// ); +// tmpfile("populated.index") +// } + +fn prepare_index(index_type: &str, learn: &str, base: &[&str], dataset_id: &str) -> String { + let base_path = format!("/tmp/faiss-disk-test/{dataset_id}/"); + fs::create_dir_all(base_path.clone()).unwrap(); + let tmpfile = |filename: &str| base_path.clone() + filename.as_ref(); + + println!("Training index"); + let FVecs { + dimensions, + vectors, + } = FVecs::from_file(learn).unwrap(); + let mut index = index_factory(dimensions as u32, index_type, MetricType::InnerProduct).unwrap(); + index.train(vectors.as_slice()).unwrap(); + + let index = index.into_ivf_flat().unwrap(); + write_index(&index, tmpfile("trained.index")).unwrap(); + + let shards = base + .iter() + .enumerate() + .map(|(index, filename)| { + println!("opening new file {filename}"); + let dimensions = if index != 0 { + Some(dimensions) + } else { + None + }; + FVecsReader::from_file(filename, 10_000_000, dimensions).unwrap()}) + .flat_map(|reader| reader.into_iter()); + let shard_files_iter = + || (0..usize::MAX).map(|file_num| tmpfile(format!("shard_{file_num}.index").as_str())); + + let mut id_gen = 0..usize::MAX; + for (chunk, filename) in shards.zip(shard_files_iter()) { + let num_vecs = chunk.len() / dimensions; + println!("Processing {} vectors", num_vecs); + let ids: Vec<_> = id_gen + .by_ref() + .take(num_vecs) + .map(|id| Idx::from(id as i64)) + .collect(); + let mut index = read_index(tmpfile("trained.index")).unwrap(); + index.add_with_ids(&chunk, &ids).unwrap(); + println!("Writting them to {filename}"); + write_index(&index, filename).unwrap(); + } + + let all_shard_files: Vec<_> = shard_files_iter() + .take_while(|filename| match fs::metadata(filename) { + Err(_) => false, + Ok(metadata) => metadata.is_file(), + }) + .collect(); + let all_shard_files: Vec<_> = all_shard_files + .iter() + .map(|filename| filename.as_str()) + .collect(); + + println!("merging indexes: {}", all_shard_files.join(", ")); + merge_ondisk( + &tmpfile("trained.index"), + all_shard_files, + &tmpfile("merged_index.ivfdata"), + &tmpfile("populated.index"), + ); + tmpfile("populated.index") +} + +fn bench(c: &mut Criterion) { + let mut rng = rand::thread_rng(); + + // let index_path = prepare_index( + // "IVF4096,Flat", + // "resources/sift/sift_learn.fvecs", + // &["resources/sift/sift_base.fvecs"], + // "sift", + // ); + // let mut index = read_index(index_path).unwrap(); + // let query_batch = FVecs::from_file("resources/sift/sift_query.fvecs").unwrap(); + // let queries: Vec<_> = query_batch.split().collect(); + // c.bench_function("sift 1M", |b| { + // b.iter_batched( + // || queries[rng.next_u64() as usize % queries.len()], + // |query| index.search(query, 1).unwrap(), + // BatchSize::SmallInput, + // ); + // }); + + // let index_path = prepare_index( + // "IVF4096,Flat", + // "resources/deep/deep10M.fvecs", + // &["resources/deep/deep10M.fvecs"], + // "deep10", + // ); + // let mut index = read_index(index_path).unwrap(); + // let query_batch = FVecs::from_file("resources/deep/deep1B_queries.fvecs").unwrap(); + // let queries: Vec<_> = query_batch.split().collect(); + // c.bench_function("deep 10M", |b| { + // b.iter_batched( + // || queries[rng.next_u64() as usize % queries.len()], + // |query| index.search(query, 1).unwrap(), + // BatchSize::SmallInput, + // ); + // }); + + let index_path = prepare_index( + "IVF262144_HNSW32,Flat", + "resources/deep/deep10M.fvecs", + &[ + "resources/deep/base_00", + "resources/deep/base_01", + "resources/deep/base_02", + "resources/deep/base_03", + "resources/deep/base_00", + "resources/deep/base_01", + "resources/deep/base_02", + "resources/deep/base_03", + "resources/deep/base_00", + "resources/deep/base_01", + "resources/deep/base_02", + "resources/deep/base_03", + ], + // &["resources/deep/deep10M.fvecs"], + "deep", + ); + let mut index = read_index(index_path).unwrap(); + let query_batch = FVecs::from_file("resources/deep/deep1B_queries.fvecs").unwrap(); + let queries: Vec<_> = query_batch.split().collect(); + c.bench_function("deep 325M", |b| { + b.iter_batched( + || queries[rng.next_u64() as usize % queries.len()], + |query| index.search(query, 1).unwrap(), + BatchSize::SmallInput, + ); + }); +} + +criterion_group!(benches, bench); +criterion_main!(benches); diff --git a/disk-test/src/lib.rs b/disk-test/src/lib.rs new file mode 100644 index 0000000000..5d2b8c4e94 --- /dev/null +++ b/disk-test/src/lib.rs @@ -0,0 +1,97 @@ +use std::fs; +use std::fs::File; +use std::io::ErrorKind::InvalidData; +use std::io::Read; +use std::io::{BufReader, Error as IoError}; + +pub struct FVecs { + pub dimensions: usize, + pub vectors: Vec, +} + +impl FVecs { + pub fn from_file(file_name: &str) -> Result { + let data = fs::read(file_name)?; + let (dim_data, vector_data) = data.split_at(4); + let dim = dim_data + .try_into() + .map_err(|e| IoError::new(InvalidData, e))?; + let dimensions = u32::from_le_bytes(dim) as usize; + let vectors: Vec<_> = vector_data + .chunks_exact(4) + .map(|chunk| f32::from_le_bytes(chunk.try_into().unwrap())) + .collect(); + + Ok(Self { + dimensions, + vectors, + }) + } + + pub fn split(&self) -> impl Iterator { + self.vectors.chunks(self.dimensions) + } + + pub fn get(&self, index: usize) -> Option<&[f32]> { + self.split().nth(index) + } +} + +pub struct FVecsReader { + pub dimensions: usize, + reader: BufReader, + chunk_size: usize, +} + +impl FVecsReader { + pub fn from_file( + filename: &str, + chunk_size: usize, + dimensions: Option, + ) -> Result { + let file = File::open(filename)?; + let mut reader = BufReader::new(file); + let mut buffer = vec![0; 4]; + let dimensions = + dimensions + .map(|d| Ok(d)) + .unwrap_or_else(|| match reader.read(&mut buffer) { + Ok(4) => Ok(u32::from_le_bytes(buffer.try_into().unwrap()) as usize), + Ok(_) => Err(IoError::from(InvalidData)), + Err(e) => Err(e), + })?; + + println!("dimensions for vectors in file {filename}: {dimensions}"); + + Ok(Self { + dimensions, + reader, + chunk_size, + }) + } +} + +impl Drop for FVecsReader { + fn drop(&mut self) { + println!("dropping file reader"); + } +} + +impl Iterator for FVecsReader { + type Item = Vec; + fn next(&mut self) -> Option { + let mut buffer = vec![0; self.chunk_size * self.dimensions * 4]; + match self.reader.read(&mut buffer) { + Err(_) => None, + Ok(0) => None, + Ok(n) => { + buffer.truncate(n); + let floats = buffer + .chunks_exact(4) + .map(|chunk| f32::from_le_bytes(chunk.try_into().unwrap())) + .collect(); + Some(floats) + } + } + } +} diff --git a/disk-test/src/main.rs b/disk-test/src/main.rs new file mode 100644 index 0000000000..2cb91d53d9 --- /dev/null +++ b/disk-test/src/main.rs @@ -0,0 +1,64 @@ +use disk_faiss::merge_ondisk; +use disk_test::FVecs; +use faiss::{index_factory, read_index, write_index, Idx, Index, MetricType}; + +fn tmpfile>(filename: S) -> String { + "/tmp/faiss-disk-test/".to_owned() + filename.as_ref() +} + +fn main() { + println!("Training index"); + let FVecs { + dimensions, + vectors, + } = FVecs::from_file("resources/sift/sift_learn.fvecs").unwrap(); + let mut index = + index_factory(dimensions as u32, "IVF4096,Flat", MetricType::InnerProduct).unwrap(); + index.train(vectors.as_slice()).unwrap(); + + let index = index.into_ivf_flat().unwrap(); + write_index(&index, tmpfile("trained.index")).unwrap(); + + println!("Splitting vectors into files"); + let vectors = FVecs::from_file("resources/sift/sift_base.fvecs") + .unwrap() + .vectors; + + let num_chunks = 4; + let num_vectors = vectors.len() / dimensions as usize; + let vectors_per_chunk = num_vectors / num_chunks + 1; + let chunk_files: Vec<_> = (0..num_chunks) + .map(|chunk_number| tmpfile(format!("block_{chunk_number}.index"))) + .collect(); + let chunk_files: Vec<_> = chunk_files.iter().map(|f| f.as_str()).collect(); + + for ((chunk_number, chunk), filename) in vectors + .chunks(vectors_per_chunk * dimensions as usize) + .enumerate() + .zip(chunk_files.iter()) + { + let first_id = vectors_per_chunk * chunk_number; + let ids_range = first_id..(first_id + chunk.len()); + let ids: Vec<_> = ids_range.map(|id| Idx::from(id as i64)).collect(); + let mut index = read_index(tmpfile("trained.index")).unwrap(); + index.add_with_ids(chunk, ids.as_slice()).unwrap(); + write_index(&index, filename).unwrap(); + } + + println!("merging indexes on disk"); + merge_ondisk( + &tmpfile("trained.index"), + chunk_files, + &tmpfile("merged_index.ivfdata"), + &tmpfile("populated.index"), + ); + + println!("using the ondisk index"); + let mut index = read_index(&tmpfile("populated.index")).unwrap(); + let queries = FVecs::from_file("resources/sift/sift_query.fvecs").unwrap(); + let first_query = queries.get(0).unwrap(); + + let result = index.search(first_query, 5).unwrap(); + println!("result: {result:?}"); + // println!("success!!!!!!!"); +} diff --git a/faiss-rs b/faiss-rs new file mode 160000 index 0000000000..358f4cd98b --- /dev/null +++ b/faiss-rs @@ -0,0 +1 @@ +Subproject commit 358f4cd98b4bdf66013e9b792140944e6b275d69 diff --git a/raphtory-benchmark/Cargo.toml b/raphtory-benchmark/Cargo.toml index 2dc29e3e0a..7e5b706862 100644 --- a/raphtory-benchmark/Cargo.toml +++ b/raphtory-benchmark/Cargo.toml @@ -6,11 +6,13 @@ edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -criterion = "0.5.1" -raphtory = { path = "../raphtory" , features=["io"]} +criterion = { version = "0.5.1", features = ["async_tokio"] } +tokio = { version = "1.27.0", features = ["full"] } +raphtory = { path = "../raphtory", features = ["io", "vectors"] } sorted_vector_map = "0.1" rand = "0.8.5" rayon = "1" +faiss = { path = "../faiss-rs" } [[bench]] name = "tgraph_benchmarks" @@ -35,3 +37,7 @@ harness = false [[bench]] name = "edge_add" harness = false + +[[bench]] +name = "vectors" +harness = false diff --git a/raphtory-benchmark/benches/vectors.rs b/raphtory-benchmark/benches/vectors.rs new file mode 100644 index 0000000000..a9efac840e --- /dev/null +++ b/raphtory-benchmark/benches/vectors.rs @@ -0,0 +1,209 @@ +use criterion::{criterion_group, criterion_main, Criterion}; +use rand::{ + distributions::{Alphanumeric, DistString}, + thread_rng, Rng, +}; +use raphtory::core::DocumentInput; +use raphtory::db::graph::views::deletion_graph::GraphWithDeletions; +use raphtory::vectors::{document_template::DocumentTemplate, vectorisable::Vectorisable}; +use raphtory::{core::entities::nodes::input_node::InputNode, prelude::*, vectors::Embedding}; +use std::path::PathBuf; +use tokio::runtime::Runtime; + +mod common; + +async fn random_embedding(texts: Vec) -> Vec { + let mut rng = thread_rng(); + texts + .iter() + .map(|_| (0..1536).map(|_| rng.gen()).collect()) + .collect() +} + +struct EmptyTemplate; + +impl DocumentTemplate for EmptyTemplate { + fn graph(&self, _graph: &Graph) -> Box> { + Box::new(std::iter::empty()) + } + + fn node( + &self, + _node: &raphtory::db::graph::node::NodeView, + ) -> Box> { + Box::new(std::iter::once("".into())) + } + + fn edge( + &self, + _edge: &raphtory::db::graph::edge::EdgeView, + ) -> Box> { + Box::new(std::iter::once("".into())) + } +} + +pub fn vectors(c: &mut Criterion) { + let rt = Runtime::new().unwrap(); + let g = Graph::new(); + for id in 0..500_000 { + g.add_node(0, id, NO_PROPS, None).unwrap(); + } + for id in 0..500_000 { + g.add_edge(0, 0, id, NO_PROPS, None).unwrap(); + } + let query = rt.block_on(random_embedding(vec!["".to_owned()])).remove(0); + let cache_path = || Some(PathBuf::from("/tmp/raphtory/vector-bench")); + + let native_vectorised_graph = rt.block_on(g.vectorise_with_template( + Box::new(random_embedding), + cache_path(), + true, + EmptyTemplate, + false, // use faiss + false, + )); + c.bench_function("native-index", |b| { + b.iter(|| native_vectorised_graph.append_by_similarity(&query, 1, None)); + }); + + let faiss_vectorised_graph = rt.block_on(g.vectorise_with_template( + Box::new(random_embedding), + cache_path(), + true, + EmptyTemplate, + true, // use faiss + false, + )); + c.bench_function("faiss-index", |b| { + b.iter(|| faiss_vectorised_graph.append_by_similarity(&query, 1, None)); + }); +} + +struct FVecsContent { + dimensions: u32, + vectors: Vec, +} + +use std::fs; +use std::io::Error as IoError; +use std::io::ErrorKind::InvalidData; + +fn read_fvecs(file_name: &str) -> Result { + let data = fs::read(file_name)?; + let (dim_data, vector_data) = data.split_at(4); + let dim = dim_data + .try_into() + .map_err(|e| IoError::new(InvalidData, e))?; + let dimensions = u32::from_le_bytes(dim); + let vectors: Vec<_> = vector_data + .chunks_exact(4) + .map(|chunk| f32::from_le_bytes(chunk.try_into().unwrap())) + .collect(); + + Ok(FVecsContent { + dimensions, + vectors, + }) +} + +// fn read_fvecs(file_name: &str) -> Result { +// let mut file = File::open(file_name)?; +// let mut buffer = [0u8; 4]; +// file.read_exact(&mut buffer); +// let mut cursor = Cursor::new(buffer); +// let value = cursor.read_i32::()?; +// ... +// } + +use faiss::index::io_flags::IoFlags; +use faiss::index::IndexImpl; +use faiss::index::{io::read_index_with_flags, NativeIndex}; +use faiss::{index_factory, read_index, write_index, Idx, Index, MetricType}; + +// this is based on https://github.com/facebookresearch/faiss/blob/12b92e9fa5d8e8fb3da53c57af9ff007c826b1ee/contrib/ondisk.py +fn merge_on_disk(trained_index: &IndexImpl, shard_fnames: Vec<&str>, ivfdata_fname: &str) { + // assert not isinstance( + // trained_index, faiss.IndexIVFPQR + // ), "IndexIVFPQR is not supported as an on disk index." + // + + let ivfs = shard_fnames.iter().map(|filename| { + let index = read_index_with_flags(filename, IoFlags::MEM_MAP).unwrap(); + index.into_ivf_flat().unwrap(); + }); + + let index_ivf = trained_index.into_ivf_flat().unwrap(); + + assert_eq!(trained_index.ntotal(), 0, "works only on empty index") + + // FIXME: can't figure out next line !! + // invlists = faiss.OnDiskInvertedLists( + // index_ivf.nlist, index_ivf.code_size, ivfdata_fname + // ) + + // # merge all the inverted lists + // ivf_vector = faiss.InvertedListsPtrVector() + // for ivf in ivfs: + // ivf_vector.push_back(ivf) + + // LOG.info("merge %d inverted lists " % ivf_vector.size()) + // ntotal = invlists.merge_from(ivf_vector.data(), ivf_vector.size()) + + // # now replace the inverted lists in the output index + // index.ntotal = index_ivf.ntotal = ntotal + // index_ivf.replace_invlists(invlists, True) + // invlists.this.disown() +} + +pub fn faiss(c: &mut Criterion) { + let rt = Runtime::new().unwrap(); + + let tmpfile = |filename: &str| "/tmp/faiss-disk-test/".to_owned() + filename; + + println!("Training index"); + let FVecsContent { + dimensions, + vectors, + } = read_fvecs("resources/sift/sift_learn.fvecs").unwrap(); + // println!("dimensions -> {dimensions}"); + // let sample: Vec<_> = flattened_vectors.iter().take(8).collect(); + // println!("sample -> {sample:?}"); + let mut index = index_factory(dimensions, "IVF4096,Flat", MetricType::InnerProduct).unwrap(); + index.train(vectors.as_slice()).unwrap(); + write_index(&index, tmpfile("trained.index")).unwrap(); + + println!("Splitting vectors into files"); + let vectors = read_fvecs("resources/sift/sift_base.fvecs") + .unwrap() + .vectors; + + let num_vectors = vectors.len() / dimensions as usize; + let vectors_per_chunk = num_vectors / 4 + 1; + + for (chunk_number, chunk) in vectors + .chunks(vectors_per_chunk * dimensions as usize) + .enumerate() + { + let first_id = vectors_per_chunk * chunk_number; + let ids_range = first_id..(first_id + chunk.len()); + let ids: Vec<_> = ids_range.map(|id| Idx::from(id as i64)).collect(); + let mut index = read_index(tmpfile("trained.index")).unwrap(); + index.add_with_ids(chunk, ids.as_slice()).unwrap(); + let block_name = format!("block_{chunk_number}.index"); + write_index(&index, tmpfile(block_name.as_str())).unwrap(); + } + + println!("loading trained index"); + let index = read_index(tmpfile("trained.index")).unwrap(); + + // let block_fnames: Vec<_> = (0..4).map(|n_chunk| tmpfile(format!("block_{n_chunk}.index").as_str())).collect(); + // merge_ondisk(index, block_fnames, tmpdir + "merged_index.ivfdata").unwrap(); + // write_index(&index, tmpdir + "populated.index").unwrap(); + + // c.bench_function("faiss-index", |b| { + // b.iter(|| ()); + // }); +} + +criterion_group!(benches, faiss); +criterion_main!(benches); diff --git a/raphtory-graphql/src/server.rs b/raphtory-graphql/src/server.rs index 66e266a4f8..a4511f727b 100644 --- a/raphtory-graphql/src/server.rs +++ b/raphtory-graphql/src/server.rs @@ -116,6 +116,7 @@ impl RaphtoryServer { true, template.clone(), true, + true, ) .await; stores.write().insert(graph_name, vectorised); diff --git a/raphtory/Cargo.toml b/raphtory/Cargo.toml index 4479d4413f..8e36e71809 100644 --- a/raphtory/Cargo.toml +++ b/raphtory/Cargo.toml @@ -18,21 +18,25 @@ homepage.workspace = true bincode = "1" chrono = { version = "0.4.31", features = ["serde"] } genawaiter = "0.99" -itertools= "0.12.0" +itertools = "0.12.0" num-traits = "0.2" -parking_lot = { version = "0.12" , features = ["serde", "arc_lock", "send_guard"] } +parking_lot = { version = "0.12", features = [ + "serde", + "arc_lock", + "send_guard", +] } once_cell = "1" rand = "0.8.5" rand_distr = "0.4.3" rayon = "1" regex = "1" rustc-hash = "1.1.0" -serde = { version = "1", features = ["derive","rc"] } +serde = { version = "1", features = ["derive", "rc"] } sorted_vector_map = "0.1" thiserror = "1" twox-hash = "1.6.3" lock_api = { version = "0.4", features = ["arc_lock", "serde"] } -dashmap = {version ="5", features = ["serde"] } +dashmap = { version = "5", features = ["serde"] } enum_dispatch = "0.3" ordered-float = "4.1.1" glam = "0.25.0" @@ -40,29 +44,40 @@ quad-rand = "0.2.1" serde_json = "1" # io optional dependencies -csv = {version="1.1.6", optional=true} -zip = {version ="0.6.6", optional=true} -neo4rs = {version="0.6.1", optional=true} -bzip2 = {version="0.4", optional=true} -flate2 = {version="1.0", optional=true} -reqwest = { version = "0.11.14", features = ["blocking", "rustls-tls"], default-features = false, optional=true} -tokio = { version = "1.27.0", features = ["full"], optional=true} +csv = { version = "1.1.6", optional = true } +zip = { version = "0.6.6", optional = true } +neo4rs = { version = "0.6.1", optional = true } +bzip2 = { version = "0.4", optional = true } +flate2 = { version = "1.0", optional = true } +reqwest = { version = "0.11.14", features = [ + "blocking", + "rustls-tls", +], default-features = false, optional = true } +tokio = { version = "1.27.0", features = ["full"], optional = true } # search optional dependencies -tantivy = {version= "0.21.1", optional=true} +tantivy = { version = "0.21.1", optional = true } # vectors optional dependencies -futures-util = {version="0.3.0", optional=true} -async-trait = {version="0.1.73", optional=true} -async-openai = {version= "0.17.1", optional=true} +futures-util = { version = "0.3.0", optional = true } +async-trait = { version = "0.1.73", optional = true } +async-openai = { version = "0.17.1", optional = true } +# faiss = { version = "0.12.1", optional = true, features = ["static"] } +# faiss = { path = "../faiss-rs", optional = true, features = ["static"] } +faiss = { path = "../faiss-rs", optional = true } # python binding optional dependencies -pyo3 = {version= "0.20.0", features=["multiple-pymethods", "chrono"], optional=true} -pyo3-asyncio = { version = "0.20.0", features = ["tokio-runtime"], optional=true } -num = {version="0.4.0", optional=true} -display-error-chain = {version= "0.2.0", optional=true} -arrow2 = {version= "0.18.0", optional=true} -kdam = { version="0.5.1", features = ["notebook"], optional = true} +pyo3 = { version = "0.20.0", features = [ + "multiple-pymethods", + "chrono", +], optional = true } +pyo3-asyncio = { version = "0.20.0", features = [ + "tokio-runtime", +], optional = true } +num = { version = "0.4.0", optional = true } +display-error-chain = { version = "0.2.0", optional = true } +arrow2 = { version = "0.18.0", optional = true } +kdam = { version = "0.5.1", features = ["notebook"], optional = true } [dev-dependencies] @@ -72,18 +87,39 @@ quickcheck = "1" quickcheck_macros = "1" tempfile = "3.2" tempdir = "0.3" -tokio = { version = "1.27.0", features = ["full"]} # for vector testing -dotenv = "0.15.0" # for vector testing +tokio = { version = "1.27.0", features = ["full"] } # for vector testing +dotenv = "0.15.0" # for vector testing streaming-stats = "0.2" proptest = "1.4.0" [features] default = [] # Enables the graph loader io module -io = ["dep:zip", "dep:neo4rs", "dep:bzip2", "dep:flate2", "dep:csv", "dep:reqwest", "dep:tokio"] +io = [ + "dep:zip", + "dep:neo4rs", + "dep:bzip2", + "dep:flate2", + "dep:csv", + "dep:reqwest", + "dep:tokio", +] # Enables generating the pyo3 python bindings -python = ["io", "dep:pyo3", "dep:pyo3-asyncio", "dep:num", "dep:display-error-chain", "dep:arrow2", "dep:kdam"] +python = [ + "io", + "dep:pyo3", + "dep:pyo3-asyncio", + "dep:num", + "dep:display-error-chain", + "dep:arrow2", + "dep:kdam", +] # search search = ["dep:tantivy"] # vectors -vectors = ["dep:futures-util", "dep:async-trait", "dep:async-openai"] +vectors = [ + "dep:futures-util", + "dep:async-trait", + "dep:async-openai", + "dep:faiss", +] diff --git a/raphtory/src/python/packages/vectors.rs b/raphtory/src/python/packages/vectors.rs index eef3e2e51b..9db528764a 100644 --- a/raphtory/src/python/packages/vectors.rs +++ b/raphtory/src/python/packages/vectors.rs @@ -31,7 +31,7 @@ use pyo3::{ prelude::*, types::{PyFunction, PyList}, }; -use std::{path::PathBuf, sync::Arc}; +use std::{collections::HashMap, path::PathBuf, sync::Arc}; pub type PyWindow = Option<(PyTime, PyTime)>; @@ -300,7 +300,7 @@ impl PyGraphView { /// /// Returns: /// A VectorisedGraph with all the documents/embeddings computed and with an initial empty selection - #[pyo3(signature = (embedding, cache = None, overwrite_cache = false, graph_document = None, node_document = None, edge_document = None, verbose = false))] + #[pyo3(signature = (embedding, cache = None, overwrite_cache = false, graph_document = None, node_document = None, edge_document = None, index = "native", verbose = false))] fn vectorise( &self, embedding: &PyFunction, @@ -309,23 +309,34 @@ impl PyGraphView { graph_document: Option, node_document: Option, edge_document: Option, + index: &str, verbose: bool, - ) -> DynamicVectorisedGraph { + ) -> PyResult { let embedding: Py = embedding.into(); let graph = self.graph.clone(); let cache = cache.map(PathBuf::from); let template = PyDocumentTemplate::new(graph_document, node_document, edge_document); - execute_async_task(move || async move { + + let index_enum = HashMap::from([("flat", true), ("native", false)]); + let use_faiss = index_enum.get(index).cloned().ok_or_else(|| { + let valid_values = index_enum.keys().join(", "); + let message = format!("invalid value for `index`. Valid values are: {valid_values}"); + PyAttributeError::new_err(message) + })?; + + let vectorised_graph = execute_async_task(move || async move { graph .vectorise_with_template( Box::new(embedding.clone()), cache, overwrite_cache, Arc::new(template) as Arc>, + use_faiss, verbose, ) .await - }) + }); + Ok(vectorised_graph) } } diff --git a/raphtory/src/vectors/document_ref.rs b/raphtory/src/vectors/document_ref.rs index 55dd920491..67cf5e6ce2 100644 --- a/raphtory/src/vectors/document_ref.rs +++ b/raphtory/src/vectors/document_ref.rs @@ -8,6 +8,7 @@ use crate::{ use serde::{Deserialize, Serialize}; use std::hash::{Hash, Hasher}; +// TODO: this is not a refence, find another name, like CompressedDocument /// this struct contains the minimum amount of information need to regenerate a document using a /// template and to quickly apply windows over them #[derive(Clone, Debug, Serialize, Deserialize)] diff --git a/raphtory/src/vectors/embeddings.rs b/raphtory/src/vectors/embeddings.rs index 18e949c339..111aa3ae43 100644 --- a/raphtory/src/vectors/embeddings.rs +++ b/raphtory/src/vectors/embeddings.rs @@ -18,33 +18,3 @@ pub async fn openai_embedding(texts: Vec) -> Vec { println!("Generated embeddings successfully"); response.data.into_iter().map(|e| e.embedding).collect_vec() } - -// this is currently commented out so we don't need to add any new dependencies -// but might be potentially useful in the future -// async fn sentence_transformers_embeddings(texts: Vec) -> Vec { -// println!("computing embeddings for {} texts", texts.len()); -// Python::with_gil(|py| { -// let sentence_transformers = py.import("sentence_transformers")?; -// let locals = [("sentence_transformers", sentence_transformers)].into_py_dict(py); -// locals.set_item("texts", texts); -// -// let pyarray: &PyArray2 = py -// .eval( -// &format!( -// "sentence_transformers.SentenceTransformer('thenlper/gte-small').encode(texts)" -// ), -// Some(locals), -// None, -// )? -// .extract()?; -// -// let readonly = pyarray.readonly(); -// let chunks = readonly.as_slice().unwrap().chunks(384).into_iter(); -// let embeddings = chunks -// .map(|chunk| chunk.iter().copied().collect_vec()) -// .collect_vec(); -// -// Ok::>, Box>(embeddings) -// }) -// .unwrap() -// } diff --git a/raphtory/src/vectors/faiss_store.rs b/raphtory/src/vectors/faiss_store.rs new file mode 100644 index 0000000000..d2cf84759d --- /dev/null +++ b/raphtory/src/vectors/faiss_store.rs @@ -0,0 +1,106 @@ +use super::{document_ref::DocumentRef, entity_id::EntityId, Embedding}; +use faiss::{index::IndexImpl, index_factory, Idx, Index, MetricType}; +use itertools::Itertools; +use std::collections::HashMap; + + +trait ExternalVectorIndex { + +} + +#[derive(Clone, Debug)] +pub(crate) struct DocumentPointer { + pub(crate) entity: EntityId, + pub(crate) subindex: usize, // TODO: reduce this inside and provide nice error when there are too much documents for some entity +} + +pub(crate) struct FaissIndex { + mapping: Vec, + index: IndexImpl, +} + +impl FaissIndex { + fn get(&self, idx: u64) -> DocumentPointer { + self.mapping.get(idx as usize).unwrap().clone() + } + + /// This function returns a vector just to take ownership of Faiss results + pub(crate) fn search(&mut self, query: &Embedding, limit: usize) -> Vec { + // TODO: assert that the length of the query is correct + let result = self.index.search(query.as_slice(), limit); + match result { + Ok(result) => { + let valid_labels = result.labels.iter().filter_map(|idx| idx.get()); + valid_labels.map(|idx| self.get(idx)).collect_vec() + } + Err(_) => vec![], + } + } +} + +pub(crate) struct FaissStore { + pub(crate) nodes: FaissIndex, + pub(crate) edges: FaissIndex, +} + +impl FaissStore { + pub(crate) fn from_refs( + nodes: &HashMap>, + edges: &HashMap>, + ) -> Self { + // TODO: review, this doesnt group if there are empty groups! + let maybe_node_group = nodes.iter().next(); + let maybe_edge_group = edges.iter().next(); + let maybe_group = maybe_node_group.or(maybe_edge_group); + let maybe_vector = maybe_group.and_then(|(_, docs)| docs.get(0)); + let dim = maybe_vector.map(|vec| vec.embedding.len()).unwrap_or(1) as u32; + Self { + nodes: build_entity_index(nodes, dim), + edges: build_entity_index(edges, dim), + } + } + + // pub(crate) fn search_nodes( + // &self, + // query: Embedding, + // limit: usize, + // ) -> Vec<(DocumentPointer, f32)> { + // search(&self.nodes, query, limit) + // } + + // pub(crate) fn search_edges( + // &self, + // query: Embedding, + // limit: usize, + // ) -> Vec<(DocumentPointer, f32)> { + // search(&self.edges, query, limit) + // } +} + +fn build_entity_index(entities: &HashMap>, dim: u32) -> FaissIndex { + let mut index = index_factory(dim, "IVF4096_HNSW32,Flat", MetricType::InnerProduct).unwrap(); + + let flattened = entities.iter().flat_map(|(entity, docs)| { + docs.iter() + .enumerate() + .map(|(subindex, doc)| (entity.clone(), subindex, doc)) + }); + let mapping = flattened + .clone() + .map(|(entity, subindex, _)| DocumentPointer { entity, subindex }) + .collect_vec(); + let data_vec = flattened + .clone() + .flat_map(|(_, _, doc)| doc.embedding.clone()) + .collect_vec(); + let data = data_vec.as_slice(); + let ids: Vec = flattened + .enumerate() + .map(|(id, _)| (id as i64).into()) + .collect_vec(); + + index.train(data).unwrap(); + index.add_with_ids(data, ids.as_slice()).unwrap(); + + FaissIndex { index, mapping } +} diff --git a/raphtory/src/vectors/mod.rs b/raphtory/src/vectors/mod.rs index 81abb6e722..caa5703fbc 100644 --- a/raphtory/src/vectors/mod.rs +++ b/raphtory/src/vectors/mod.rs @@ -7,6 +7,7 @@ pub mod document_template; mod embedding_cache; pub mod embeddings; mod entity_id; +mod faiss_store; pub mod graph_entity; mod similarity_search_utils; pub mod splitting; @@ -166,7 +167,7 @@ mod vector_tests { g.add_node(0, "test", NO_PROPS, None).unwrap(); // the following succeeds with no cache set up - g.vectorise(Box::new(fake_embedding), None, true, false) + g.vectorise(Box::new(fake_embedding), None, true, false, false) .await; let path = "/tmp/raphtory/very/deep/path/embedding-cache-test"; @@ -178,6 +179,7 @@ mod vector_tests { Some(PathBuf::from(path)), true, false, + false, ) .await; @@ -188,6 +190,7 @@ mod vector_tests { Some(PathBuf::from(path)), true, false, + false, ) .await; } @@ -199,7 +202,7 @@ mod vector_tests { let g = Graph::new(); let cache = PathBuf::from("/tmp/raphtory/vector-cache-lotr-test"); let vectors = g - .vectorise(Box::new(fake_embedding), Some(cache), true, false) + .vectorise(Box::new(fake_embedding), Some(cache), true, false, false) .await; let embedding: Embedding = fake_embedding(vec!["whatever".to_owned()]).await.remove(0); let docs = vectors @@ -288,6 +291,7 @@ age: 30"###; true, FakeMultiDocumentTemplate, false, + false, ) .await; @@ -347,6 +351,7 @@ age: 30"###; true, FakeTemplateWithIntervals, false, + false, ) .await; @@ -429,6 +434,7 @@ age: 30"###; true, CustomTemplate, false, + false, ) .await; @@ -466,4 +472,122 @@ age: 30"###; .get_documents(); assert!(docs[0].content().contains("Frodo appeared with Gandalf")); } + + use faiss::{index_factory, Index, MetricType}; + + async fn predictable_embedding(texts: Vec) -> Vec { + texts + .into_iter() + .map(|text| { + let index = text.parse::().unwrap(); + let mut vector = vec![0.0; 10]; + if let Some(element) = vector.get_mut(index) { + *element = 1.0; + }; + vector + }) + .collect_vec() + } + + struct PredictableTemplate; + + impl DocumentTemplate for PredictableTemplate { + fn graph(&self, graph: &G) -> Box> { + DefaultTemplate.graph(graph) + } + + fn node(&self, node: &NodeView) -> Box> { + Box::new(std::iter::once(node.name().to_string().into())) + } + fn edge(&self, edge: &EdgeView) -> Box> { + Box::new(std::iter::once(edge.src().name().to_string().into())) + } + } + + #[tokio::test] + async fn test_faiss_empty() { + let g = Graph::new(); + + let v = g + .vectorise_with_template( + Box::new(predictable_embedding), + None, + false, + PredictableTemplate, + true, + true, + ) + .await; + + let selection = v.append_nodes_by_similarity(&vec![5.0, 0.0, 0.0, 0.0, 0.0], 10, None); + + let len = selection.get_documents().len(); + assert_eq!(len, 0); + } + + #[tokio::test] + async fn test_faiss_full() { + let g = Graph::new(); + for n in 0..10 { + g.add_node(0, n, NO_PROPS, None); + g.add_edge(0, n, 1, NO_PROPS, None); + } + + let v = g + .vectorise_with_template( + Box::new(predictable_embedding), + None, + false, + PredictableTemplate, + true, + true, + ) + .await; + + let query = vec![1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0]; + let selection = v.append_nodes_by_similarity(&query, 20, None); + + let len = selection.get_documents_with_scores().len(); + assert_eq!(len, 10); + } + + #[tokio::test] + async fn test_faiss_normal() { + let g = Graph::new(); + for n in 0..10 { + g.add_node(0, n, NO_PROPS, None); + g.add_edge(0, n, 0, NO_PROPS, None); + } + + let v = g + .vectorise_with_template( + Box::new(predictable_embedding), + None, + false, + PredictableTemplate, + true, + true, + ) + .await; + + let query = vec![1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0]; + + let selection = v.append_nodes_by_similarity(&query, 5, None); + let (doc, score) = selection.get_documents_with_scores().remove(0); + assert_eq!(doc.into_content(), "0"); + assert_eq!(score, 1.0); + + let selection = v.append_edges_by_similarity(&query, 5, None); + let (doc, score) = selection.get_documents_with_scores().remove(0); + assert_eq!(doc.into_content(), "0"); + assert_eq!(score, 1.0); + + let selection = v.append_by_similarity(&query, 5, None); + let (doc, score) = selection.get_documents_with_scores().remove(0); + assert_eq!(doc.into_content(), "0"); + assert_eq!(score, 1.0); + let (doc, score) = selection.get_documents_with_scores().remove(0); + assert_eq!(doc.into_content(), "0"); + assert_eq!(score, 1.0); + } } diff --git a/raphtory/src/vectors/similarity_search_utils.rs b/raphtory/src/vectors/similarity_search_utils.rs index 40e53d813d..3f3f1974e8 100644 --- a/raphtory/src/vectors/similarity_search_utils.rs +++ b/raphtory/src/vectors/similarity_search_utils.rs @@ -41,7 +41,13 @@ fn cosine(vector1: &Embedding, vector2: &Embedding) -> f32 { let normalized = dot_product / (x_length.sqrt() * y_length.sqrt()); // println!("cosine for {vector1:?} and {vector2:?} is {normalized}"); - assert!(normalized <= 1.001); - assert!(normalized >= -1.001); + assert!( + normalized <= 1.001, + "not valid result: {normalized} for vectors:\n{vector1:?}\n{vector2:?}" + ); + assert!( + normalized >= -1.001, + "not valid result: {normalized} for vectors:\n{vector1:?}\n{vector2:?}" + ); normalized } diff --git a/raphtory/src/vectors/vectorisable.rs b/raphtory/src/vectors/vectorisable.rs index 564b0c8909..a72e4524c2 100644 --- a/raphtory/src/vectors/vectorisable.rs +++ b/raphtory/src/vectors/vectorisable.rs @@ -11,8 +11,11 @@ use crate::{ }; use async_trait::async_trait; use itertools::Itertools; +use parking_lot::RwLock; use std::{collections::HashMap, path::PathBuf}; +use super::faiss_store::FaissStore; + const CHUNK_SIZE: usize = 1000; #[derive(Clone, Debug)] @@ -32,7 +35,7 @@ pub trait Vectorisable { /// * cache - the file to be used as a cache to avoid calling the embedding function /// * overwrite_cache - whether or not to overwrite the cache if there are new embeddings /// * verbose - whether or not to print logs reporting the progress - /// + /// /// # Returns: /// A VectorisedGraph with all the documents/embeddings computed and with an initial empty selection async fn vectorise( @@ -40,6 +43,7 @@ pub trait Vectorisable { embedding: Box, cache_file: Option, override_cache: bool, + faiss: bool, verbose: bool, ) -> VectorisedGraph; @@ -51,7 +55,7 @@ pub trait Vectorisable { /// * overwrite_cache - whether or not to overwrite the cache if there are new embeddings /// * template - the template to use to translate entities into documents /// * verbose - whether or not to print logs reporting the progress - /// + /// /// # Returns: /// A VectorisedGraph with all the documents/embeddings computed and with an initial empty selection async fn vectorise_with_template>( @@ -60,6 +64,7 @@ pub trait Vectorisable { cache: Option, override_cache: bool, template: T, + faiss: bool, verbose: bool, ) -> VectorisedGraph; } @@ -71,10 +76,18 @@ impl Vectorisable for G { embedding: Box, cache: Option, overwrite_cache: bool, + faiss: bool, verbose: bool, ) -> VectorisedGraph { - self.vectorise_with_template(embedding, cache, overwrite_cache, DefaultTemplate, verbose) - .await + self.vectorise_with_template( + embedding, + cache, + overwrite_cache, + DefaultTemplate, + faiss, + verbose, + ) + .await } async fn vectorise_with_template>( @@ -83,6 +96,7 @@ impl Vectorisable for G { cache: Option, overwrite_cache: bool, template: T, + faiss: bool, verbose: bool, ) -> VectorisedGraph { let graph_docs = @@ -145,6 +159,13 @@ impl Vectorisable for G { cache_storage.iter().for_each(|cache| cache.dump_to_disk()); } + let faiss_store = if faiss { + let store = FaissStore::from_refs(&node_refs, &edge_refs); + Some(RwLock::new(store).into()) + } else { + None + }; + VectorisedGraph::new( self.clone(), template.into(), @@ -152,6 +173,7 @@ impl Vectorisable for G { graph_refs.into(), node_refs.into(), edge_refs.into(), + faiss_store, vec![], ) } diff --git a/raphtory/src/vectors/vectorised_graph.rs b/raphtory/src/vectors/vectorised_graph.rs index f2b2b2b8ee..395a9be3b6 100644 --- a/raphtory/src/vectors/vectorised_graph.rs +++ b/raphtory/src/vectors/vectorised_graph.rs @@ -15,12 +15,35 @@ use crate::{ }, }; use itertools::{chain, Itertools}; +use parking_lot::RwLock; use std::{ collections::{HashMap, HashSet}, path::PathBuf, sync::Arc, }; +use super::faiss_store::{DocumentPointer, FaissStore}; + +// enum IndexInput<'a> { +// Native(Box)>>), +// Faiss(Vec<&'a mut FaissIndex>), +// } + +// impl<'a, I: IntoIterator)> + 'a> From +// for IndexInput<'a> +// { +// fn from(value: I) -> Self { +// IndexInput::Native(Box::new(value.into_iter())) +// } +// } + +#[derive(Debug)] +enum AppendMode { + Nodes, + Edges, + Both, +} + #[derive(Clone, Copy)] enum ExpansionPath { Nodes, @@ -36,6 +59,7 @@ pub struct VectorisedGraph> { pub(crate) graph_documents: Arc>, pub(crate) node_documents: Arc>>, // TODO: replace with FxHashMap pub(crate) edge_documents: Arc>>, + faiss_store: Option>>, selected_docs: Vec<(DocumentRef, f32)>, empty_vec: Vec, } @@ -53,6 +77,7 @@ impl> Clone for VectorisedGraph> VectorisedGraph { graph_documents: Arc>, node_documents: Arc>>, edge_documents: Arc>>, + faiss_store: Option>>, selected_docs: Vec<(DocumentRef, f32)>, ) -> Self { Self { @@ -75,6 +101,7 @@ impl> VectorisedGraph { graph_documents, node_documents, edge_documents, + faiss_store, selected_docs, empty_vec: vec![], } @@ -181,6 +208,7 @@ impl> VectorisedGraph { } } + // FIXME: this is not included graph documents as of now /// Add the top `limit` documents to the current selection using `query` /// /// # Arguments @@ -196,8 +224,7 @@ impl> VectorisedGraph { limit: usize, window: Option<(i64, i64)>, ) -> Self { - let joined = chain!(self.node_documents.iter(), self.edge_documents.iter()); - self.add_top_documents(joined, query, limit, window) + self.add_top_documents(AppendMode::Both, query, limit, window) } /// Add the top `limit` node documents to the current selection using `query` @@ -215,7 +242,7 @@ impl> VectorisedGraph { limit: usize, window: Option<(i64, i64)>, ) -> Self { - self.add_top_documents(self.node_documents.as_ref(), query, limit, window) + self.add_top_documents(AppendMode::Nodes, query, limit, window) } /// Add the top `limit` edge documents to the current selection using `query` @@ -233,7 +260,7 @@ impl> VectorisedGraph { limit: usize, window: Option<(i64, i64)>, ) -> Self { - self.add_top_documents(self.edge_documents.as_ref(), query, limit, window) + self.add_top_documents(AppendMode::Edges, query, limit, window) } /// Add all the documents `hops` hops away to the selection @@ -312,7 +339,7 @@ impl> VectorisedGraph { /// /// # Arguments /// * query - the text or the embedding to score against - /// * limit - the maximum number of new documents to add + /// * limit - the maximum number of new documents to add /// * window - the window where documents need to belong to in order to be considered /// /// # Returns @@ -421,34 +448,68 @@ impl> VectorisedGraph { } } - fn add_top_documents<'a, I>( + fn add_top_documents<'a>( &self, - document_groups: I, + mode: AppendMode, query: &Embedding, limit: usize, window: Option<(i64, i64)>, - ) -> Self - where - I: IntoIterator)> + 'a, - { - let documents = document_groups - .into_iter() - .flat_map(|(_, embeddings)| embeddings); + ) -> Self { + // we don't want to use faiss if there is a window set + let valid_faiss_store = match window { + Some(_) => None, + None => self.faiss_store.clone(), + }; - let window_docs: Box> = match window { - None => Box::new(documents), - Some((start, end)) => { - let windowed_graph = self.source_graph.window(start, end); - let filtered = documents.filter(move |document| { - document.exists_on_window(Some(&windowed_graph), window) + let filtered: Box> = match valid_faiss_store { + None => { + let document_groups: Box)>> = + match mode { + AppendMode::Nodes => Box::new(self.node_documents.iter()), + AppendMode::Edges => Box::new(self.edge_documents.iter()), + AppendMode::Both => Box::new(chain!( + self.node_documents.iter(), + self.edge_documents.iter() + )), + }; + let documents = document_groups.flat_map(|(_, embeddings)| embeddings); + match window { + None => Box::new(documents), + Some((start, end)) => { + let windowed_graph = self.source_graph.window(start, end); + let filtered = documents.filter(move |document| { + document.exists_on_window(Some(&windowed_graph), window) + }); + Box::new(filtered) + } + } + } + Some(store) => { + let mut store = store.write(); + let pointers = match mode { + AppendMode::Nodes => store.nodes.search(query, limit).into_iter(), + AppendMode::Edges => store.edges.search(query, limit).into_iter(), + AppendMode::Both => { + let mut pointers = store.nodes.search(query, limit); + pointers.extend(store.edges.search(query, limit)); + pointers.into_iter() + } + }; + let doc_refs = pointers.map(|DocumentPointer { entity, subindex }| { + let doc_group = match entity { + EntityId::Node { .. } => self.node_documents.get(&entity), + EntityId::Edge { .. } => self.edge_documents.get(&entity), + EntityId::Graph { .. } => panic!("this is illegal"), + }; + doc_group.unwrap().get(subindex).unwrap() }); - Box::new(filtered) + Box::new(doc_refs.collect_vec().into_iter()) } }; let new_len = self.selected_docs.len() + limit; - let scored_nodes = score_documents(query, window_docs.cloned()); // TODO: try to remove this clone - let candidates = find_top_k(scored_nodes, usize::MAX); + let scored_docs = score_documents(query, filtered.cloned()); // TODO: try to remove this clone + let candidates = find_top_k(scored_docs, limit); // TODO: review, this used to be usize::MAX instead of limit let new_selected = extend_selection(self.selected_docs.clone(), candidates, new_len); Self { @@ -457,6 +518,35 @@ impl> VectorisedGraph { } } + // fn native_search<'a, I>( + // &self, + // document_groups: I, + // query: &Embedding, + // limit: usize, + // window: Option<(i64, i64)>, + // ) -> impl Iterator + // where + // I: IntoIterator)> + 'a, + // { + // let documents = document_groups + // .into_iter() + // .flat_map(|(_, embeddings)| embeddings); + + // let window_docs: Box> = match window { + // None => Box::new(documents), + // Some((start, end)) => { + // let windowed_graph = self.source_graph.window(start, end); + // let filtered = documents.filter(move |document| { + // document.exists_on_window(Some(&windowed_graph), window) + // }); + // Box::new(filtered) + // } + // }; + + // let scored_docs = score_documents(query, window_docs.cloned()); // TODO: try to remove this clone + // find_top_k(scored_docs, limit) // TODO: review, this used to be usize::MAX instead of limit + // } + // this might return the document used as input, uniqueness need to be check outside of this fn get_context<'a, W: StaticGraphViewOps>( &'a self, diff --git a/raphtory/src/vectors/vectorised_graph_storage.rs b/raphtory/src/vectors/vectorised_graph_storage.rs index 4adda4ab20..5dc450be40 100644 --- a/raphtory/src/vectors/vectorised_graph_storage.rs +++ b/raphtory/src/vectors/vectorised_graph_storage.rs @@ -151,6 +151,7 @@ impl VectorisedGraphStorage { Arc::new(graph_documents), Arc::new(node_documents), Arc::new(edge_documents), + None, // FIXME: recompute the faiss store optionally vec![], )) }