Skip to content

Commit

Permalink
storage: Use batches for direct RocksDB mutations
Browse files Browse the repository at this point in the history
Currently, doing direct mutations on a RocksDB instance bypasses
custom batching / syncing logic that we've built on top of it.
This, or something internal to RocksDB, started leading to some bugs
when all direct mutations started passing in WriteOptions.sync = true
(see #55240 for when this change went in).

In this change, direct mutations still commit the batch with sync=true
to guarantee WAL syncing, but they go through the batch commit pipeline
too, just like the vast majority of operations already do.

Fixes #55362.

Release note: None.
  • Loading branch information
itsbilal committed Oct 20, 2020
1 parent d6de3ec commit 8978797
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 19 deletions.
5 changes: 0 additions & 5 deletions c-deps/libroach/engine.cc
Original file line number Diff line number Diff line change
Expand Up @@ -136,13 +136,11 @@ DBStatus DBImpl::AssertPreClose() {

DBStatus DBImpl::Put(DBKey key, DBSlice value) {
rocksdb::WriteOptions options;
options.sync = true;
return ToDBStatus(rep->Put(options, EncodeKey(key), ToSlice(value)));
}

DBStatus DBImpl::Merge(DBKey key, DBSlice value) {
rocksdb::WriteOptions options;
options.sync = true;
return ToDBStatus(rep->Merge(options, EncodeKey(key), ToSlice(value)));
}

Expand All @@ -154,19 +152,16 @@ DBStatus DBImpl::Get(DBKey key, DBString* value) {

DBStatus DBImpl::Delete(DBKey key) {
rocksdb::WriteOptions options;
options.sync = true;
return ToDBStatus(rep->Delete(options, EncodeKey(key)));
}

DBStatus DBImpl::SingleDelete(DBKey key) {
rocksdb::WriteOptions options;
options.sync = true;
return ToDBStatus(rep->SingleDelete(options, EncodeKey(key)));
}

DBStatus DBImpl::DeleteRange(DBKey start, DBKey end) {
rocksdb::WriteOptions options;
options.sync = true;
return ToDBStatus(
rep->DeleteRange(options, rep->DefaultColumnFamily(), EncodeKey(start), EncodeKey(end)));
}
Expand Down
73 changes: 59 additions & 14 deletions pkg/storage/rocksdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -508,7 +508,16 @@ func (r *RocksDB) Attrs() roachpb.Attributes {
//
// It is safe to modify the contents of the arguments after Put returns.
func (r *RocksDB) Put(key MVCCKey, value []byte) error {
return dbPut(r.rdb, key, value)
if len(key.Key) == 0 {
return emptyKeyError()
}

b := r.NewWriteOnlyBatch()
defer b.Close()
if err := b.Put(key, value); err != nil {
return err
}
return b.Commit(true /* sync */)
}

// Merge implements the RocksDB merge operator using the function goMergeInit
Expand All @@ -519,7 +528,16 @@ func (r *RocksDB) Put(key MVCCKey, value []byte) error {
//
// It is safe to modify the contents of the arguments after Merge returns.
func (r *RocksDB) Merge(key MVCCKey, value []byte) error {
return dbMerge(r.rdb, key, value)
if len(key.Key) == 0 {
return emptyKeyError()
}

b := r.NewWriteOnlyBatch()
defer b.Close()
if err := b.Merge(key, value); err != nil {
return err
}
return b.Commit(true /* sync */)
}

// LogData is part of the Writer interface.
Expand All @@ -541,7 +559,12 @@ func (r *RocksDB) LogLogicalOp(op MVCCLogicalOpType, details MVCCLogicalOpDetail
// It is safe to modify the contents of the arguments after ApplyBatchRepr
// returns.
func (r *RocksDB) ApplyBatchRepr(repr []byte, sync bool) error {
return dbApplyBatchRepr(r.rdb, repr, sync)
b := r.NewWriteOnlyBatch()
defer b.Close()
if err := b.ApplyBatchRepr(repr, sync); err != nil {
return err
}
return b.Commit(sync)
}

// Get returns the value for the given key.
Expand All @@ -560,22 +583,43 @@ func (r *RocksDB) GetProto(
//
// It is safe to modify the contents of the arguments after Clear returns.
func (r *RocksDB) Clear(key MVCCKey) error {
return dbClear(r.rdb, key)
if len(key.Key) == 0 {
return emptyKeyError()
}
b := r.NewWriteOnlyBatch()
defer b.Close()
if err := b.Clear(key); err != nil {
return err
}
return b.Commit(true /* sync */)
}

// SingleClear removes the most recent item from the db with the given key.
//
// It is safe to modify the contents of the arguments after SingleClear returns.
func (r *RocksDB) SingleClear(key MVCCKey) error {
return dbSingleClear(r.rdb, key)
if len(key.Key) == 0 {
return emptyKeyError()
}
b := r.NewWriteOnlyBatch()
defer b.Close()
if err := b.SingleClear(key); err != nil {
return err
}
return b.Commit(true /* sync */)
}

// ClearRange removes a set of entries, from start (inclusive) to end
// (exclusive).
//
// It is safe to modify the contents of the arguments after ClearRange returns.
func (r *RocksDB) ClearRange(start, end MVCCKey) error {
return dbClearRange(r.rdb, start, end)
b := r.NewWriteOnlyBatch()
defer b.Close()
if err := b.ClearRange(start, end); err != nil {
return err
}
return b.Commit(true /* sync */)
}

// ClearIterRange removes a set of entries, from start (inclusive) to end
Expand All @@ -584,7 +628,15 @@ func (r *RocksDB) ClearRange(start, end MVCCKey) error {
// It is safe to modify the contents of the arguments after ClearIterRange
// returns.
func (r *RocksDB) ClearIterRange(iter Iterator, start, end roachpb.Key) error {
return dbClearIterRange(r.rdb, iter, start, end)
b := r.NewWriteOnlyBatch()
defer b.Close()
err := r.Iterate(start, end, func(keyValue MVCCKeyValue) error {
return b.Clear(keyValue.Key)
})
if err != nil {
return err
}
return b.Commit(true /* sync */)
}

// Iterate iterates from start to end keys, invoking f on each
Expand Down Expand Up @@ -2577,13 +2629,6 @@ func dbClear(rdb *C.DBEngine, key MVCCKey) error {
return statusToError(C.DBDelete(rdb, goToCKey(key)))
}

func dbSingleClear(rdb *C.DBEngine, key MVCCKey) error {
if len(key.Key) == 0 {
return emptyKeyError()
}
return statusToError(C.DBSingleDelete(rdb, goToCKey(key)))
}

func dbClearRange(rdb *C.DBEngine, start, end MVCCKey) error {
if err := statusToError(C.DBDeleteRange(rdb, goToCKey(start), goToCKey(end))); err != nil {
return err
Expand Down

0 comments on commit 8978797

Please sign in to comment.