From dae2f19cd8d72b7c1b8f7daa2bc2316856ce7f62 Mon Sep 17 00:00:00 2001 From: Bilal Akhtar Date: Mon, 19 Oct 2020 15:52:11 -0400 Subject: [PATCH] storage: Use batches for direct RocksDB mutations Currently, doing direct mutations on a RocksDB instance bypasses custom batching / syncing logic that we've built on top of it. This, or something internal to RocksDB, started leading to some bugs when all direct mutations started passing in WriteOptions.sync = true (see #55240 for when this change went in). In this change, direct mutations still commit the batch with sync=true to guarantee WAL syncing, but they go through the batch commit pipeline too, just like the vast majority of operations already do. Fixes #55362. Release note: None. --- pkg/storage/rocksdb.go | 52 ++++++++++++++++++++++++++++++++++++------ 1 file changed, 45 insertions(+), 7 deletions(-) diff --git a/pkg/storage/rocksdb.go b/pkg/storage/rocksdb.go index 3585e1717ee2..85619f1d98e4 100644 --- a/pkg/storage/rocksdb.go +++ b/pkg/storage/rocksdb.go @@ -508,7 +508,12 @@ func (r *RocksDB) Attrs() roachpb.Attributes { // // It is safe to modify the contents of the arguments after Put returns. func (r *RocksDB) Put(key MVCCKey, value []byte) error { - return dbPut(r.rdb, key, value) + b := r.NewWriteOnlyBatch() + defer b.Close() + if err := b.Put(key, value); err != nil { + return err + } + return b.Commit(true) } // Merge implements the RocksDB merge operator using the function goMergeInit @@ -519,7 +524,12 @@ func (r *RocksDB) Put(key MVCCKey, value []byte) error { // // It is safe to modify the contents of the arguments after Merge returns. func (r *RocksDB) Merge(key MVCCKey, value []byte) error { - return dbMerge(r.rdb, key, value) + b := r.NewWriteOnlyBatch() + defer b.Close() + if err := b.Merge(key, value); err != nil { + return err + } + return b.Commit(true) } // LogData is part of the Writer interface. @@ -541,7 +551,12 @@ func (r *RocksDB) LogLogicalOp(op MVCCLogicalOpType, details MVCCLogicalOpDetail // It is safe to modify the contents of the arguments after ApplyBatchRepr // returns. func (r *RocksDB) ApplyBatchRepr(repr []byte, sync bool) error { - return dbApplyBatchRepr(r.rdb, repr, sync) + b := r.NewWriteOnlyBatch() + defer b.Close() + if err := b.ApplyBatchRepr(repr, sync); err != nil { + return err + } + return b.Commit(sync) } // Get returns the value for the given key. @@ -560,14 +575,24 @@ func (r *RocksDB) GetProto( // // It is safe to modify the contents of the arguments after Clear returns. func (r *RocksDB) Clear(key MVCCKey) error { - return dbClear(r.rdb, key) + b := r.NewWriteOnlyBatch() + defer b.Close() + if err := b.Clear(key); err != nil { + return err + } + return b.Commit(true) } // SingleClear removes the most recent item from the db with the given key. // // It is safe to modify the contents of the arguments after SingleClear returns. func (r *RocksDB) SingleClear(key MVCCKey) error { - return dbSingleClear(r.rdb, key) + b := r.NewWriteOnlyBatch() + defer b.Close() + if err := b.SingleClear(key); err != nil { + return err + } + return b.Commit(true) } // ClearRange removes a set of entries, from start (inclusive) to end @@ -575,7 +600,12 @@ func (r *RocksDB) SingleClear(key MVCCKey) error { // // It is safe to modify the contents of the arguments after ClearRange returns. func (r *RocksDB) ClearRange(start, end MVCCKey) error { - return dbClearRange(r.rdb, start, end) + b := r.NewWriteOnlyBatch() + defer b.Close() + if err := b.ClearRange(start, end); err != nil { + return err + } + return b.Commit(true) } // ClearIterRange removes a set of entries, from start (inclusive) to end @@ -584,7 +614,15 @@ func (r *RocksDB) ClearRange(start, end MVCCKey) error { // It is safe to modify the contents of the arguments after ClearIterRange // returns. func (r *RocksDB) ClearIterRange(iter Iterator, start, end roachpb.Key) error { - return dbClearIterRange(r.rdb, iter, start, end) + b := r.NewWriteOnlyBatch() + defer b.Close() + err := r.Iterate(start, end, func(keyValue MVCCKeyValue) error { + return b.Clear(keyValue.Key) + }) + if err != nil { + return err + } + return b.Commit(true) } // Iterate iterates from start to end keys, invoking f on each