From 897879788ba9697e63413780b8924c7d4c9a6c7f Mon Sep 17 00:00:00 2001 From: Bilal Akhtar Date: Mon, 19 Oct 2020 15:52:11 -0400 Subject: [PATCH] storage: Use batches for direct RocksDB mutations Currently, doing direct mutations on a RocksDB instance bypasses custom batching / syncing logic that we've built on top of it. This, or something internal to RocksDB, started leading to some bugs when all direct mutations started passing in WriteOptions.sync = true (see #55240 for when this change went in). In this change, direct mutations still commit the batch with sync=true to guarantee WAL syncing, but they go through the batch commit pipeline too, just like the vast majority of operations already do. Fixes #55362. Release note: None. --- c-deps/libroach/engine.cc | 5 --- pkg/storage/rocksdb.go | 73 +++++++++++++++++++++++++++++++-------- 2 files changed, 59 insertions(+), 19 deletions(-) diff --git a/c-deps/libroach/engine.cc b/c-deps/libroach/engine.cc index 184b688e773a..b4f9aba56a7b 100644 --- a/c-deps/libroach/engine.cc +++ b/c-deps/libroach/engine.cc @@ -136,13 +136,11 @@ DBStatus DBImpl::AssertPreClose() { DBStatus DBImpl::Put(DBKey key, DBSlice value) { rocksdb::WriteOptions options; - options.sync = true; return ToDBStatus(rep->Put(options, EncodeKey(key), ToSlice(value))); } DBStatus DBImpl::Merge(DBKey key, DBSlice value) { rocksdb::WriteOptions options; - options.sync = true; return ToDBStatus(rep->Merge(options, EncodeKey(key), ToSlice(value))); } @@ -154,19 +152,16 @@ DBStatus DBImpl::Get(DBKey key, DBString* value) { DBStatus DBImpl::Delete(DBKey key) { rocksdb::WriteOptions options; - options.sync = true; return ToDBStatus(rep->Delete(options, EncodeKey(key))); } DBStatus DBImpl::SingleDelete(DBKey key) { rocksdb::WriteOptions options; - options.sync = true; return ToDBStatus(rep->SingleDelete(options, EncodeKey(key))); } DBStatus DBImpl::DeleteRange(DBKey start, DBKey end) { rocksdb::WriteOptions options; - options.sync = true; return ToDBStatus( rep->DeleteRange(options, rep->DefaultColumnFamily(), EncodeKey(start), EncodeKey(end))); } diff --git a/pkg/storage/rocksdb.go b/pkg/storage/rocksdb.go index 3585e1717ee2..3607d8b4ce83 100644 --- a/pkg/storage/rocksdb.go +++ b/pkg/storage/rocksdb.go @@ -508,7 +508,16 @@ func (r *RocksDB) Attrs() roachpb.Attributes { // // It is safe to modify the contents of the arguments after Put returns. func (r *RocksDB) Put(key MVCCKey, value []byte) error { - return dbPut(r.rdb, key, value) + if len(key.Key) == 0 { + return emptyKeyError() + } + + b := r.NewWriteOnlyBatch() + defer b.Close() + if err := b.Put(key, value); err != nil { + return err + } + return b.Commit(true /* sync */) } // Merge implements the RocksDB merge operator using the function goMergeInit @@ -519,7 +528,16 @@ func (r *RocksDB) Put(key MVCCKey, value []byte) error { // // It is safe to modify the contents of the arguments after Merge returns. func (r *RocksDB) Merge(key MVCCKey, value []byte) error { - return dbMerge(r.rdb, key, value) + if len(key.Key) == 0 { + return emptyKeyError() + } + + b := r.NewWriteOnlyBatch() + defer b.Close() + if err := b.Merge(key, value); err != nil { + return err + } + return b.Commit(true /* sync */) } // LogData is part of the Writer interface. @@ -541,7 +559,12 @@ func (r *RocksDB) LogLogicalOp(op MVCCLogicalOpType, details MVCCLogicalOpDetail // It is safe to modify the contents of the arguments after ApplyBatchRepr // returns. func (r *RocksDB) ApplyBatchRepr(repr []byte, sync bool) error { - return dbApplyBatchRepr(r.rdb, repr, sync) + b := r.NewWriteOnlyBatch() + defer b.Close() + if err := b.ApplyBatchRepr(repr, sync); err != nil { + return err + } + return b.Commit(sync) } // Get returns the value for the given key. @@ -560,14 +583,30 @@ func (r *RocksDB) GetProto( // // It is safe to modify the contents of the arguments after Clear returns. func (r *RocksDB) Clear(key MVCCKey) error { - return dbClear(r.rdb, key) + if len(key.Key) == 0 { + return emptyKeyError() + } + b := r.NewWriteOnlyBatch() + defer b.Close() + if err := b.Clear(key); err != nil { + return err + } + return b.Commit(true /* sync */) } // SingleClear removes the most recent item from the db with the given key. // // It is safe to modify the contents of the arguments after SingleClear returns. func (r *RocksDB) SingleClear(key MVCCKey) error { - return dbSingleClear(r.rdb, key) + if len(key.Key) == 0 { + return emptyKeyError() + } + b := r.NewWriteOnlyBatch() + defer b.Close() + if err := b.SingleClear(key); err != nil { + return err + } + return b.Commit(true /* sync */) } // ClearRange removes a set of entries, from start (inclusive) to end @@ -575,7 +614,12 @@ func (r *RocksDB) SingleClear(key MVCCKey) error { // // It is safe to modify the contents of the arguments after ClearRange returns. func (r *RocksDB) ClearRange(start, end MVCCKey) error { - return dbClearRange(r.rdb, start, end) + b := r.NewWriteOnlyBatch() + defer b.Close() + if err := b.ClearRange(start, end); err != nil { + return err + } + return b.Commit(true /* sync */) } // ClearIterRange removes a set of entries, from start (inclusive) to end @@ -584,7 +628,15 @@ func (r *RocksDB) ClearRange(start, end MVCCKey) error { // It is safe to modify the contents of the arguments after ClearIterRange // returns. func (r *RocksDB) ClearIterRange(iter Iterator, start, end roachpb.Key) error { - return dbClearIterRange(r.rdb, iter, start, end) + b := r.NewWriteOnlyBatch() + defer b.Close() + err := r.Iterate(start, end, func(keyValue MVCCKeyValue) error { + return b.Clear(keyValue.Key) + }) + if err != nil { + return err + } + return b.Commit(true /* sync */) } // Iterate iterates from start to end keys, invoking f on each @@ -2577,13 +2629,6 @@ func dbClear(rdb *C.DBEngine, key MVCCKey) error { return statusToError(C.DBDelete(rdb, goToCKey(key))) } -func dbSingleClear(rdb *C.DBEngine, key MVCCKey) error { - if len(key.Key) == 0 { - return emptyKeyError() - } - return statusToError(C.DBSingleDelete(rdb, goToCKey(key))) -} - func dbClearRange(rdb *C.DBEngine, start, end MVCCKey) error { if err := statusToError(C.DBDeleteRange(rdb, goToCKey(start), goToCKey(end))); err != nil { return err