diff --git a/rust/Cargo.lock b/rust/Cargo.lock index 3198d4ac6349e..e4c4237c48315 100644 --- a/rust/Cargo.lock +++ b/rust/Cargo.lock @@ -102,6 +102,17 @@ dependencies = [ "alloc-no-stdlib", ] +[[package]] +name = "ambassador" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "61874b33258f18ca7923047c12887078ccfe95c2811b03c1a09e309c19b7e50b" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "ansi_term" version = "0.11.0" @@ -478,9 +489,9 @@ dependencies = [ [[package]] name = "bindgen" -version = "0.54.0" +version = "0.55.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "66c0bb6167449588ff70803f4127f0684f9063097eca5016f37eb52b92c2cf36" +checksum = "75b13ce559e6433d360c26305643803cb52cfbabbc2b9c47ce04a58493dfb443" dependencies = [ "bitflags", "cexpr", @@ -692,9 +703,9 @@ dependencies = [ [[package]] name = "clang-sys" -version = "0.29.3" +version = "1.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fe6837df1d5cba2397b835c8530f51723267e16abbf83892e9e5af4f0e5dd10a" +checksum = "0659001ab56b791be01d4b729c44376edc6718cf389a502e579b77b758f3296c" dependencies = [ "glob", "libc", @@ -1015,7 +1026,7 @@ dependencies = [ "num", "parquet", "procspawn", - "rand 0.8.0", + "rand 0.8.1", "regex", "reqwest", "rocksdb", @@ -1067,6 +1078,17 @@ dependencies = [ "tokio", ] +[[package]] +name = "delegate" +version = "0.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "12e531288b600a8bea48baff926d2f16a3f68fda1cd2d59240279907ba727332" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "derivative" version = "2.1.3" @@ -1970,19 +1992,18 @@ checksum = "1482821306169ec4d07f6aca392a4681f66c75c9918aa49641a2595db64053cb" [[package]] name = "libloading" -version = "0.5.2" +version = "0.6.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f2b111a074963af1d37a139918ac6d49ad1d0d5e47f72fd55388619691a7d753" +checksum = "e9367bdfa836b7e3cf895867f7a570283444da90562980ec2263d6e1569b16bc" dependencies = [ - "cc", + "cfg-if 1.0.0", "winapi 0.3.9", ] [[package]] name = "librocksdb-sys" version = "6.11.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "eb5b56f651c204634b936be2f92dbb42c36867e00ff7fe2405591f3b9fa66f09" +source = "git+https://github.com/cube-js/rust-rocksdb?branch=transaction#9d3e389328d2b8d0180dca781a794186b1868226" dependencies = [ "bindgen", "cc", @@ -2905,13 +2926,13 @@ dependencies = [ [[package]] name = "rand" -version = "0.8.0" +version = "0.8.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a76330fb486679b4ace3670f117bbc9e16204005c4bde9c4bd372f45bed34f12" +checksum = "c24fcd450d3fa2b592732565aa4f17a27a61c65ece4726353e000939b0edee34" dependencies = [ "libc", "rand_chacha 0.3.0", - "rand_core 0.6.0", + "rand_core 0.6.1", "rand_hc 0.3.0", ] @@ -2942,7 +2963,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e12735cf05c9e10bf21534da50a147b924d555dc7a547c42e6bb2d5b6017ae0d" dependencies = [ "ppv-lite86", - "rand_core 0.6.0", + "rand_core 0.6.1", ] [[package]] @@ -2971,9 +2992,9 @@ dependencies = [ [[package]] name = "rand_core" -version = "0.6.0" +version = "0.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a8b34ba8cfb21243bd8df91854c830ff0d785fff2e82ebd4434c2644cb9ada18" +checksum = "c026d7df8b298d90ccbbc5190bd04d85e159eaf5576caeacf8741da93ccbd2e5" dependencies = [ "getrandom 0.2.1", ] @@ -3002,7 +3023,7 @@ version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3190ef7066a446f2e7f42e239d161e905420ccab01eb967c9eb27d21b2322a73" dependencies = [ - "rand_core 0.6.0", + "rand_core 0.6.1", ] [[package]] @@ -3187,9 +3208,10 @@ dependencies = [ [[package]] name = "rocksdb" version = "0.15.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "23d83c02c429044d58474eaf5ae31e062d0de894e21125b47437ec0edc1397e6" +source = "git+https://github.com/cube-js/rust-rocksdb?branch=transaction#9d3e389328d2b8d0180dca781a794186b1868226" dependencies = [ + "ambassador", + "delegate", "libc", "librocksdb-sys", ] diff --git a/rust/cubestore/Cargo.toml b/rust/cubestore/Cargo.toml index efc88ecd666da..4de0b4c511f25 100644 --- a/rust/cubestore/Cargo.toml +++ b/rust/cubestore/Cargo.toml @@ -33,7 +33,7 @@ simple_logger = "1.7.0" async-trait = "0.1.36" actix-rt = "1.1.1" regex = "1.3.9" -rocksdb = { version = "0.15.0", default-features = false, features = ["bzip2"] } +rocksdb = { git = 'https://github.com/cube-js/rust-rocksdb', branch = 'transaction', version = "0.15.0-SNAPSHOT", default-features = false, features = ["bzip2"] } uuid = { version = "0.8", features = ["serde", "v4"] } num = "0.3.0" enum_primitive = "0.1.1" diff --git a/rust/cubestore/src/config/mod.rs b/rust/cubestore/src/config/mod.rs index 47c03608c31c1..e3fed541124cd 100644 --- a/rust/cubestore/src/config/mod.rs +++ b/rust/cubestore/src/config/mod.rs @@ -13,7 +13,7 @@ use crate::telemetry::{start_track_event_loop, stop_track_event_loop}; use crate::CubeError; use log::Level; use mockall::automock; -use rocksdb::{Options, DB}; +use rocksdb::{Options, DBUtils}; use simple_logger::SimpleLogger; use std::future::Future; use std::path::PathBuf; @@ -228,7 +228,7 @@ impl Config { services.stop_processing_loops().await.unwrap(); } - let _ = DB::destroy(&Options::default(), self.meta_store_path()); + let _ = DBUtils::destroy(&Options::default(), self.meta_store_path()); let _ = fs::remove_dir_all(store_path.clone()); let _ = fs::remove_dir_all(remote_store_path.clone()); } diff --git a/rust/cubestore/src/metastore/chunks.rs b/rust/cubestore/src/metastore/chunks.rs index 6dd6e27542ce9..6cc6b8b340127 100644 --- a/rust/cubestore/src/metastore/chunks.rs +++ b/rust/cubestore/src/metastore/chunks.rs @@ -3,7 +3,7 @@ use crate::base_rocks_secondary_index; use crate::metastore::{IdRow, MetaStoreEvent}; use crate::rocks_table_impl; use byteorder::{BigEndian, WriteBytesExt}; -use rocksdb::DB; +use rocksdb::TransactionDB; use serde::{Deserialize, Deserializer}; use std::io::Cursor; diff --git a/rust/cubestore/src/metastore/index.rs b/rust/cubestore/src/metastore/index.rs index 4d24a28f1d8a2..ae122b0e67916 100644 --- a/rust/cubestore/src/metastore/index.rs +++ b/rust/cubestore/src/metastore/index.rs @@ -4,7 +4,7 @@ use super::{ use crate::metastore::{IdRow, MetaStoreEvent}; use crate::{rocks_table_impl, CubeError}; use byteorder::{BigEndian, WriteBytesExt}; -use rocksdb::DB; +use rocksdb::TransactionDB; use serde::{Deserialize, Deserializer}; use std::io::{Cursor, Write}; diff --git a/rust/cubestore/src/metastore/job.rs b/rust/cubestore/src/metastore/job.rs index 521198f4dc195..4c4feb9049574 100644 --- a/rust/cubestore/src/metastore/job.rs +++ b/rust/cubestore/src/metastore/job.rs @@ -4,7 +4,7 @@ use crate::metastore::{IdRow, MetaStoreEvent, RowKey}; use crate::rocks_table_impl; use byteorder::{BigEndian, WriteBytesExt}; use chrono::{DateTime, Utc}; -use rocksdb::DB; +use rocksdb::TransactionDB; use serde::{Deserialize, Deserializer, Serialize}; use std::io::{Cursor, Write}; diff --git a/rust/cubestore/src/metastore/mod.rs b/rust/cubestore/src/metastore/mod.rs index fadae95cb6bf1..d12b41a101190 100644 --- a/rust/cubestore/src/metastore/mod.rs +++ b/rust/cubestore/src/metastore/mod.rs @@ -10,10 +10,7 @@ pub mod wal; use async_trait::async_trait; use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt}; use log::{error, info}; -use rocksdb::{ - DBIterator, Direction, IteratorMode, MergeOperands, Options, ReadOptions, Snapshot, WriteBatch, - WriteBatchIterator, DB, -}; +use rocksdb::{prelude::*, TransactionDB, TransactionDBOptions, IteratorMode, Direction, WriteBatch, Snapshot, DBIterator, WriteBatchIterator, MergeOperands}; use serde::{Deserialize, Deserializer, Serialize}; use std::hash::{Hash, Hasher}; use std::{collections::hash_map::DefaultHasher, env, io::Cursor, sync::Arc, time}; @@ -137,11 +134,11 @@ macro_rules! rocks_table_impl { impl<'a> RocksTable for $rocks_table<'a> { type T = $table; - fn db(&self) -> &DB { + fn db(&self) -> &TransactionDB { self.db.db } - fn snapshot(&self) -> &rocksdb::Snapshot { + fn snapshot(&self) -> &rocksdb::Snapshot { self.db.snapshot } @@ -468,13 +465,13 @@ struct KeyVal { } struct BatchPipe<'a> { - db: &'a DB, + db: &'a TransactionDB, write_batch: WriteBatch, events: Vec, } impl<'a> BatchPipe<'a> { - fn new(db: &'a DB) -> BatchPipe<'a> { + fn new(db: &'a TransactionDB) -> BatchPipe<'a> { BatchPipe { db, write_batch: WriteBatch::default(), @@ -491,16 +488,18 @@ impl<'a> BatchPipe<'a> { } fn batch_write_rows(self) -> Result, CubeError> { - let db = self.db; - db.write(self.write_batch)?; + let tx = self.db.transaction(); + tx.write(self.write_batch)?; + tx.commit()?; + Ok(self.events) } } #[derive(Clone)] pub struct DbTableRef<'a> { - pub db: &'a DB, - pub snapshot: &'a Snapshot<'a>, + pub db: &'a TransactionDB, + pub snapshot: &'a Snapshot<'a, TransactionDB>, pub mem_seq: MemorySequence, } @@ -825,7 +824,7 @@ impl MemorySequence { #[derive(Clone)] pub struct RocksMetaStore { - pub db: Arc>>, + pub db: Arc>>, seq_store: Arc>>, listeners: Arc>>>, remote_fs: Arc, @@ -924,8 +923,8 @@ where trait RocksTable: Debug + Send + Sync { type T: Serialize + Clone + Debug + Send; fn delete_event(&self, row: IdRow) -> MetaStoreEvent; - fn db(&self) -> &DB; - fn snapshot(&self) -> &Snapshot; + fn db(&self) -> &TransactionDB; + fn snapshot(&self) -> &Snapshot; fn mem_seq(&self) -> &MemorySequence; fn index_id(&self, index_num: IndexId) -> IndexId; fn table_id(&self) -> TableId; @@ -1017,7 +1016,11 @@ trait RocksTable: Debug + Send + Sync { .to_vec(), id, ); - self.db().delete(secondary_index_key.to_bytes())?; + + let tx = self.db().transaction(); + tx.delete(secondary_index_key.to_bytes())?; + tx.commit()?; + return Err(CubeError::internal(format!( "Row exists in secondary index however missing in {:?} table: {}. Repairing index.", self, id @@ -1117,7 +1120,10 @@ trait RocksTable: Debug + Send + Sync { let mut to_write = vec![]; to_write.write_u64::(next_seq)?; - db.put(seq_key.to_bytes(), to_write)?; + + let tx = db.transaction(); + tx.put(seq_key.to_bytes(), to_write)?; + tx.commit()?; Ok(next_seq) } @@ -1158,8 +1164,9 @@ trait RocksTable: Debug + Send + Sync { } fn get_row(&self, row_id: u64) -> Result>, CubeError> { - let ref db = self.snapshot(); - let res = db.get(RowKey::Table(self.table_id(), row_id).to_bytes())?; + let tx = self.db().transaction(); + let res = tx.get(RowKey::Table(self.table_id(), row_id).to_bytes())?; + tx.commit()?; if let Some(buffer) = res { let row = self.deserialize_id_row(row_id, buffer.as_slice())?; @@ -1262,7 +1269,7 @@ trait RocksTable: Debug + Send + Sync { Ok(res) } - fn table_scan<'a>(&'a self, db: &'a Snapshot) -> Result, CubeError> { + fn table_scan<'a>(&'a self, db: &'a Snapshot) -> Result, CubeError> { let my_table_id = self.table_id(); let key_min = RowKey::Table(my_table_id, 0); @@ -1403,7 +1410,9 @@ impl RocksMetaStore { opts.set_prefix_extractor(rocksdb::SliceTransform::create_fixed_prefix(13)); opts.set_merge_operator("meta_store merge", meta_store_merge, None); - let db = DB::open(&opts, path).unwrap(); + let txopts = TransactionDBOptions::default(); + + let db = TransactionDB::open_opt(&opts, path, &txopts).unwrap(); let db_arc = Arc::new(db); let meta_store = RocksMetaStore { @@ -1481,7 +1490,9 @@ impl RocksMetaStore { let batch = WriteBatchContainer::read_from_file(&path_to_log).await; if let Ok(batch) = batch { let db = meta_store.db.write().await; - db.write(batch.write_batch())?; + let tx = db.transaction(); + tx.write(batch.write_batch())?; + tx.commit()?; } else if let Err(e) = batch { error!( "Corrupted metastore WAL file. Discarding: {:?} {}", @@ -1643,7 +1654,7 @@ impl RocksMetaStore { } async fn upload_checkpoint( - db: Arc, + db: Arc, remote_fs: Arc, checkpoint_time: &SystemTime, ) -> Result<(), CubeError> { diff --git a/rust/cubestore/src/metastore/partition.rs b/rust/cubestore/src/metastore/partition.rs index 9fe1339016260..5693d81a221c3 100644 --- a/rust/cubestore/src/metastore/partition.rs +++ b/rust/cubestore/src/metastore/partition.rs @@ -6,7 +6,7 @@ use crate::metastore::{IdRow, MetaStoreEvent}; use crate::rocks_table_impl; use crate::table::Row; use byteorder::{BigEndian, WriteBytesExt}; -use rocksdb::DB; +use rocksdb::TransactionDB; use serde::{Deserialize, Deserializer}; impl Partition { diff --git a/rust/cubestore/src/metastore/schema.rs b/rust/cubestore/src/metastore/schema.rs index a2ffb7c98641d..5ea45dcdb3b84 100644 --- a/rust/cubestore/src/metastore/schema.rs +++ b/rust/cubestore/src/metastore/schema.rs @@ -1,7 +1,7 @@ use super::{BaseRocksSecondaryIndex, IndexId, RocksSecondaryIndex, RocksTable, Schema, TableId}; use crate::metastore::{IdRow, MetaStoreEvent}; use crate::rocks_table_impl; -use rocksdb::DB; +use rocksdb::TransactionDB; use serde::{Deserialize, Deserializer}; impl Schema { diff --git a/rust/cubestore/src/metastore/table.rs b/rust/cubestore/src/metastore/table.rs index bd1646598cff0..afde04f4a877d 100644 --- a/rust/cubestore/src/metastore/table.rs +++ b/rust/cubestore/src/metastore/table.rs @@ -10,7 +10,7 @@ use crate::rocks_table_impl; use crate::store::DataFrame; use crate::table::Row; use byteorder::{BigEndian, WriteBytesExt}; -use rocksdb::DB; +use rocksdb::TransactionDB; use serde::{Deserialize, Deserializer, Serialize}; use std::io::Write; use std::sync::Arc; diff --git a/rust/cubestore/src/metastore/wal.rs b/rust/cubestore/src/metastore/wal.rs index 7d16530fe6842..57de834c23474 100644 --- a/rust/cubestore/src/metastore/wal.rs +++ b/rust/cubestore/src/metastore/wal.rs @@ -3,7 +3,7 @@ use crate::base_rocks_secondary_index; use crate::metastore::{IdRow, MetaStoreEvent}; use crate::rocks_table_impl; use byteorder::{BigEndian, WriteBytesExt}; -use rocksdb::DB; +use rocksdb::TransactionDB; use serde::{Deserialize, Deserializer}; impl WAL { diff --git a/rust/cubestore/src/sql/mod.rs b/rust/cubestore/src/sql/mod.rs index 6b4860fcd5ae2..2576cb6a8f5d4 100644 --- a/rust/cubestore/src/sql/mod.rs +++ b/rust/cubestore/src/sql/mod.rs @@ -641,7 +641,7 @@ mod tests { use itertools::Itertools; use rand::distributions::Alphanumeric; use rand::{thread_rng, Rng}; - use rocksdb::{Options, DB}; + use rocksdb::{Options, TransactionDB}; use std::collections::HashSet; use std::fs::File; use std::io::Write; @@ -652,7 +652,7 @@ mod tests { #[actix_rt::test] async fn create_schema_test() { let path = "/tmp/test_create_schema"; - let _ = DB::destroy(&Options::default(), path); + let _ = TransactionDB::destroy(&Options::default(), path); let store_path = path.to_string() + &"_store".to_string(); let remote_store_path = path.to_string() + &"remote_store".to_string(); let _ = fs::remove_dir_all(store_path.clone()); @@ -681,7 +681,7 @@ mod tests { ]) ); } - let _ = DB::destroy(&Options::default(), path); + let _ = TransactionDB::destroy(&Options::default(), path); let _ = fs::remove_dir_all(store_path.clone()); let _ = fs::remove_dir_all(remote_store_path.clone()); } @@ -689,7 +689,7 @@ mod tests { #[actix_rt::test] async fn create_table_test() { let path = "/tmp/test_create_table"; - let _ = DB::destroy(&Options::default(), path); + let _ = TransactionDB::destroy(&Options::default(), path); let store_path = path.to_string() + &"_store".to_string(); let remote_store_path = path.to_string() + &"remote_store".to_string(); let _ = fs::remove_dir_all(store_path.clone()); @@ -734,7 +734,7 @@ mod tests { TableValue::String("false".to_string()), ])); } - let _ = DB::destroy(&Options::default(), path); + let _ = TransactionDB::destroy(&Options::default(), path); let _ = fs::remove_dir_all(store_path.clone()); let _ = fs::remove_dir_all(remote_store_path.clone()); } diff --git a/rust/cubestore/src/store/mod.rs b/rust/cubestore/src/store/mod.rs index 0f70a30011527..e45e9987f60b8 100644 --- a/rust/cubestore/src/store/mod.rs +++ b/rust/cubestore/src/store/mod.rs @@ -432,7 +432,7 @@ mod tests { use crate::metastore::RocksMetaStore; use crate::remotefs::LocalDirRemoteFs; use crate::{metastore::ColumnType, table::TableValue}; - use rocksdb::{Options, DB}; + use rocksdb::{Options, DBUtils}; use std::fs; use std::path::PathBuf; @@ -441,7 +441,7 @@ mod tests { let path = "/tmp/test_create_wal"; let store_path = path.to_string() + &"_store".to_string(); let remote_store_path = path.to_string() + &"remote_store".to_string(); - let _ = DB::destroy(&Options::default(), path); + let _ = DBUtils::destroy(&Options::default(), path); let _ = fs::remove_dir_all(store_path.clone()); let _ = fs::remove_dir_all(remote_store_path.clone()); @@ -506,7 +506,7 @@ mod tests { let origin_data = DataFrame::new(col.clone(), first_rows); assert!(restored_wal == origin_data); } - let _ = DB::destroy(&Options::default(), path); + let _ = DBUtils::destroy(&Options::default(), path); let _ = fs::remove_dir_all(store_path.clone()); let _ = fs::remove_dir_all(remote_store_path.clone()); } @@ -519,7 +519,7 @@ mod tests { let chunk_store_path = path.to_string() + &"_store_chunk".to_string(); let chunk_remote_store_path = path.to_string() + &"_remote_store_chunk".to_string(); - let _ = DB::destroy(&Options::default(), path); + let _ = DBUtils::destroy(&Options::default(), path); let _ = fs::remove_dir_all(wal_store_path.clone()); let _ = fs::remove_dir_all(wal_remote_store_path.clone()); let _ = fs::remove_dir_all(chunk_store_path.clone()); @@ -598,7 +598,7 @@ mod tests { assert!(restored_chunk.data == restored_wal_sorted.data); assert!(restored_chunk.data != restored_wal_not_sorted.data); } - let _ = DB::destroy(&Options::default(), path); + let _ = DBUtils::destroy(&Options::default(), path); let _ = fs::remove_dir_all(wal_store_path.clone()); let _ = fs::remove_dir_all(wal_remote_store_path.clone()); let _ = fs::remove_dir_all(chunk_store_path.clone());