Skip to content

Commit

Permalink
storage: Use batches for direct RocksDB mutations
Browse files Browse the repository at this point in the history
Currently, doing direct mutations on a RocksDB instance bypasses
custom batching / syncing logic that we've built on top of it.
This, or something internal to RocksDB, started leading to some bugs
when all direct mutations started passing in WriteOptions.sync = true
(see cockroachdb#55240 for when this change went in).

In this change, direct mutations still commit the batch with sync=true
to guarantee WAL syncing, but they go through the batch commit pipeline
too, just like the vast majority of operations already do.

Fixes cockroachdb#55362.

Release note: None.
  • Loading branch information
itsbilal committed Oct 20, 2020
1 parent 2ecba8f commit 5e0700e
Showing 1 changed file with 59 additions and 14 deletions.
73 changes: 59 additions & 14 deletions pkg/storage/engine/rocksdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -806,7 +806,16 @@ func (r *RocksDB) Attrs() roachpb.Attributes {
//
// It is safe to modify the contents of the arguments after Put returns.
func (r *RocksDB) Put(key MVCCKey, value []byte) error {
return dbPut(r.rdb, key, value)
if len(key.Key) == 0 {
return emptyKeyError()
}

b := r.NewWriteOnlyBatch()
defer b.Close()
if err := b.Put(key, value); err != nil {
return err
}
return b.Commit(true /* sync */)
}

// Merge implements the RocksDB merge operator using the function goMergeInit
Expand All @@ -817,7 +826,16 @@ func (r *RocksDB) Put(key MVCCKey, value []byte) error {
//
// It is safe to modify the contents of the arguments after Merge returns.
func (r *RocksDB) Merge(key MVCCKey, value []byte) error {
return dbMerge(r.rdb, key, value)
if len(key.Key) == 0 {
return emptyKeyError()
}

b := r.NewWriteOnlyBatch()
defer b.Close()
if err := b.Merge(key, value); err != nil {
return err
}
return b.Commit(true /* sync */)
}

// LogData is part of the Writer interface.
Expand All @@ -839,7 +857,12 @@ func (r *RocksDB) LogLogicalOp(op MVCCLogicalOpType, details MVCCLogicalOpDetail
// It is safe to modify the contents of the arguments after ApplyBatchRepr
// returns.
func (r *RocksDB) ApplyBatchRepr(repr []byte, sync bool) error {
return dbApplyBatchRepr(r.rdb, repr, sync)
b := r.NewWriteOnlyBatch()
defer b.Close()
if err := b.ApplyBatchRepr(repr, sync); err != nil {
return err
}
return b.Commit(sync)
}

// Get returns the value for the given key.
Expand All @@ -858,22 +881,43 @@ func (r *RocksDB) GetProto(
//
// It is safe to modify the contents of the arguments after Clear returns.
func (r *RocksDB) Clear(key MVCCKey) error {
return dbClear(r.rdb, key)
if len(key.Key) == 0 {
return emptyKeyError()
}
b := r.NewWriteOnlyBatch()
defer b.Close()
if err := b.Clear(key); err != nil {
return err
}
return b.Commit(true /* sync */)
}

// SingleClear removes the most recent item from the db with the given key.
//
// It is safe to modify the contents of the arguments after SingleClear returns.
func (r *RocksDB) SingleClear(key MVCCKey) error {
return dbSingleClear(r.rdb, key)
if len(key.Key) == 0 {
return emptyKeyError()
}
b := r.NewWriteOnlyBatch()
defer b.Close()
if err := b.SingleClear(key); err != nil {
return err
}
return b.Commit(true /* sync */)
}

// ClearRange removes a set of entries, from start (inclusive) to end
// (exclusive).
//
// It is safe to modify the contents of the arguments after ClearRange returns.
func (r *RocksDB) ClearRange(start, end MVCCKey) error {
return dbClearRange(r.rdb, start, end)
b := r.NewWriteOnlyBatch()
defer b.Close()
if err := b.ClearRange(start, end); err != nil {
return err
}
return b.Commit(true /* sync */)
}

// ClearIterRange removes a set of entries, from start (inclusive) to end
Expand All @@ -882,7 +926,15 @@ func (r *RocksDB) ClearRange(start, end MVCCKey) error {
// It is safe to modify the contents of the arguments after ClearIterRange
// returns.
func (r *RocksDB) ClearIterRange(iter Iterator, start, end MVCCKey) error {
return dbClearIterRange(r.rdb, iter, start, end)
b := r.NewWriteOnlyBatch()
defer b.Close()
err := r.Iterate(start, end, func(keyValue MVCCKeyValue) (bool, error) {
return false, b.Clear(keyValue.Key)
})
if err != nil {
return err
}
return b.Commit(true /* sync */)
}

// Iterate iterates from start to end keys, invoking f on each
Expand Down Expand Up @@ -2791,13 +2843,6 @@ func dbClear(rdb *C.DBEngine, key MVCCKey) error {
return statusToError(C.DBDelete(rdb, goToCKey(key)))
}

func dbSingleClear(rdb *C.DBEngine, key MVCCKey) error {
if len(key.Key) == 0 {
return emptyKeyError()
}
return statusToError(C.DBSingleDelete(rdb, goToCKey(key)))
}

func dbClearRange(rdb *C.DBEngine, start, end MVCCKey) error {
if err := statusToError(C.DBDeleteRange(rdb, goToCKey(start), goToCKey(end))); err != nil {
return err
Expand Down

0 comments on commit 5e0700e

Please sign in to comment.