Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rocksdb: Commit pipeline when write a WriteBatch for linearizability #677

Merged
merged 6 commits into from
Mar 17, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 3 additions & 19 deletions librocksdb_sys/crocksdb/c.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1062,17 +1062,6 @@ void crocksdb_write(crocksdb_t* db, const crocksdb_writeoptions_t* options,
SaveError(errptr, db->rep->Write(options->rep, &batch->rep));
}

void crocksdb_write_multi_batch(crocksdb_t* db,
const crocksdb_writeoptions_t* options,
crocksdb_writebatch_t** batches,
size_t batch_size, char** errptr) {
std::vector<WriteBatch*> ws;
for (size_t i = 0; i < batch_size; i++) {
ws.push_back(&batches[i]->rep);
}
SaveError(errptr, db->rep->MultiBatchWrite(options->rep, std::move(ws)));
}

char* crocksdb_get(crocksdb_t* db, const crocksdb_readoptions_t* options,
const char* key, size_t keylen, size_t* vallen,
char** errptr) {
Expand Down Expand Up @@ -2907,14 +2896,9 @@ void crocksdb_options_set_enable_pipelined_write(crocksdb_options_t* opt,
opt->rep.enable_pipelined_write = v;
}

void crocksdb_options_set_enable_multi_batch_write(crocksdb_options_t* opt,
unsigned char v) {
opt->rep.enable_multi_thread_write = v;
}

unsigned char crocksdb_options_is_enable_multi_batch_write(
crocksdb_options_t* opt) {
return opt->rep.enable_multi_thread_write;
void crocksdb_options_set_enable_pipelined_commit(crocksdb_options_t* opt,
unsigned char v) {
opt->rep.enable_pipelined_commit = v;
}

void crocksdb_options_set_unordered_write(crocksdb_options_t* opt,
Expand Down
8 changes: 1 addition & 7 deletions librocksdb_sys/crocksdb/crocksdb/c.h
Original file line number Diff line number Diff line change
Expand Up @@ -383,10 +383,6 @@ extern C_ROCKSDB_LIBRARY_API void crocksdb_write(
crocksdb_t* db, const crocksdb_writeoptions_t* options,
crocksdb_writebatch_t* batch, char** errptr);

extern C_ROCKSDB_LIBRARY_API void crocksdb_write_multi_batch(
crocksdb_t* db, const crocksdb_writeoptions_t* options,
crocksdb_writebatch_t** batches, size_t batch_size, char** errptr);

/* Returns NULL if not found. A malloc()ed array otherwise.
Stores the length of the array in *vallen. */
extern C_ROCKSDB_LIBRARY_API char* crocksdb_get(
Expand Down Expand Up @@ -1214,10 +1210,8 @@ extern C_ROCKSDB_LIBRARY_API void crocksdb_options_set_bytes_per_sync(
crocksdb_options_t*, uint64_t);
extern C_ROCKSDB_LIBRARY_API void crocksdb_options_set_enable_pipelined_write(
crocksdb_options_t*, unsigned char);
extern C_ROCKSDB_LIBRARY_API void crocksdb_options_set_enable_multi_batch_write(
extern C_ROCKSDB_LIBRARY_API void crocksdb_options_set_enable_pipelined_commit(
crocksdb_options_t* opt, unsigned char v);
extern C_ROCKSDB_LIBRARY_API unsigned char
crocksdb_options_is_enable_multi_batch_write(crocksdb_options_t* opt);
extern C_ROCKSDB_LIBRARY_API void crocksdb_options_set_unordered_write(
crocksdb_options_t*, unsigned char);
extern C_ROCKSDB_LIBRARY_API void
Expand Down
2 changes: 1 addition & 1 deletion librocksdb_sys/libtitan_sys/titan
10 changes: 1 addition & 9 deletions librocksdb_sys/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -667,8 +667,7 @@ extern "C" {
pub fn crocksdb_options_set_use_fsync(options: *mut Options, v: c_int);
pub fn crocksdb_options_set_bytes_per_sync(options: *mut Options, bytes: u64);
pub fn crocksdb_options_set_enable_pipelined_write(options: *mut Options, v: bool);
pub fn crocksdb_options_set_enable_multi_batch_write(options: *mut Options, v: bool);
pub fn crocksdb_options_is_enable_multi_batch_write(options: *mut Options) -> bool;
pub fn crocksdb_options_set_enable_pipelined_commit(options: *mut Options, v: bool);
pub fn crocksdb_options_set_unordered_write(options: *mut Options, v: bool);
pub fn crocksdb_options_set_allow_concurrent_memtable_write(options: *mut Options, v: bool);
pub fn crocksdb_options_set_manual_wal_flush(options: *mut Options, v: bool);
Expand Down Expand Up @@ -1158,13 +1157,6 @@ extern "C" {
batch: *mut DBWriteBatch,
err: *mut *mut c_char,
);
pub fn crocksdb_write_multi_batch(
db: *mut DBInstance,
writeopts: *const DBWriteOptions,
batch: *const *mut DBWriteBatch,
batchlen: size_t,
err: *mut *mut c_char,
);
pub fn crocksdb_writebatch_create() -> *mut DBWriteBatch;
pub fn crocksdb_writebatch_create_with_capacity(cap: size_t) -> *mut DBWriteBatch;
pub fn crocksdb_writebatch_create_from(rep: *const u8, size: size_t) -> *mut DBWriteBatch;
Expand Down
34 changes: 7 additions & 27 deletions src/rocksdb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
use crocksdb_ffi::{
self, DBBackupEngine, DBCFHandle, DBCache, DBCompressionType, DBEnv, DBInstance, DBMapProperty,
DBPinnableSlice, DBSequentialFile, DBStatisticsHistogramType, DBStatisticsTickerType,
DBTablePropertiesCollection, DBTitanDBOptions, DBWriteBatch,
DBTablePropertiesCollection, DBTitanDBOptions,
};
use libc::{self, c_char, c_int, c_void, size_t};
use librocksdb_sys::DBMemoryAllocator;
Expand Down Expand Up @@ -803,25 +803,6 @@ impl DB {
Ok(())
}

pub fn multi_batch_write(
&self,
batches: &[WriteBatch],
writeopts: &WriteOptions,
) -> Result<(), String> {
unsafe {
let b: Vec<*mut DBWriteBatch> = batches.iter().map(|w| w.inner).collect();
if !b.is_empty() {
ffi_try!(crocksdb_write_multi_batch(
self.inner,
writeopts.inner,
b.as_ptr(),
b.len()
));
}
}
Ok(())
}

pub fn write(&self, batch: &WriteBatch) -> Result<(), String> {
self.write_opt(batch, &WriteOptions::new())
}
Expand Down Expand Up @@ -3511,21 +3492,20 @@ mod test {
}

#[test]
fn test_multi_batch_write() {
fn test_commit_pipeline() {
let mut opts = DBOptions::new();
opts.create_if_missing(true);
opts.enable_multi_batch_write(true);
let path = tempdir_with_prefix("_rust_rocksdb_multi_batch");
opts.enable_pipelined_commit(true);
opts.enable_pipelined_write(false);
let path = tempdir_with_prefix("_rust_rocksdb_commit_pipeline");

let db = DB::open(opts, path.path().to_str().unwrap()).unwrap();
let cf = db.cf_handle("default").unwrap();
let mut data = Vec::new();
let w = WriteBatch::new();
for s in &[b"ab", b"cd", b"ef"] {
let w = WriteBatch::new();
w.put_cf(cf, s.to_vec().as_slice(), b"a").unwrap();
data.push(w);
}
db.multi_batch_write(&data, &WriteOptions::new()).unwrap();
db.write(&w).unwrap();
for s in &[b"ab", b"cd", b"ef"] {
let v = db.get_cf(cf, s.to_vec().as_slice()).unwrap();
assert!(v.is_some());
Expand Down
8 changes: 2 additions & 6 deletions src/rocksdb_options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1176,16 +1176,12 @@ impl DBOptions {
}
}

pub fn enable_multi_batch_write(&self, v: bool) {
pub fn enable_pipelined_commit(&self, v: bool) {
unsafe {
crocksdb_ffi::crocksdb_options_set_enable_multi_batch_write(self.inner, v);
crocksdb_ffi::crocksdb_options_set_enable_pipelined_commit(self.inner, v);
}
}

pub fn is_enable_multi_batch_write(&self) -> bool {
unsafe { crocksdb_ffi::crocksdb_options_is_enable_multi_batch_write(self.inner) }
}

pub fn enable_unordered_write(&self, v: bool) {
unsafe {
crocksdb_ffi::crocksdb_options_set_unordered_write(self.inner, v);
Expand Down