Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

release-20.2: storage: Use batches for direct RocksDB mutations #55745

Merged
merged 1 commit into from
Nov 11, 2020
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 59 additions & 14 deletions pkg/storage/rocksdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -538,7 +538,16 @@ func (r *RocksDB) Attrs() roachpb.Attributes {
//
// It is safe to modify the contents of the arguments after Put returns.
func (r *RocksDB) Put(key MVCCKey, value []byte) error {
return dbPut(r.rdb, key, value)
if len(key.Key) == 0 {
return emptyKeyError()
}

b := r.NewWriteOnlyBatch()
defer b.Close()
if err := b.Put(key, value); err != nil {
return err
}
return b.Commit(true /* sync */)
}

// Merge implements the RocksDB merge operator using the function goMergeInit
Expand All @@ -549,7 +558,16 @@ func (r *RocksDB) Put(key MVCCKey, value []byte) error {
//
// It is safe to modify the contents of the arguments after Merge returns.
func (r *RocksDB) Merge(key MVCCKey, value []byte) error {
return dbMerge(r.rdb, key, value)
if len(key.Key) == 0 {
return emptyKeyError()
}

b := r.NewWriteOnlyBatch()
defer b.Close()
if err := b.Merge(key, value); err != nil {
return err
}
return b.Commit(true /* sync */)
}

// LogData is part of the Writer interface.
Expand All @@ -571,7 +589,12 @@ func (r *RocksDB) LogLogicalOp(op MVCCLogicalOpType, details MVCCLogicalOpDetail
// It is safe to modify the contents of the arguments after ApplyBatchRepr
// returns.
func (r *RocksDB) ApplyBatchRepr(repr []byte, sync bool) error {
return dbApplyBatchRepr(r.rdb, repr, sync)
b := r.NewWriteOnlyBatch()
defer b.Close()
if err := b.ApplyBatchRepr(repr, sync); err != nil {
return err
}
return b.Commit(sync)
}

// Get returns the value for the given key.
Expand All @@ -590,22 +613,43 @@ func (r *RocksDB) GetProto(
//
// It is safe to modify the contents of the arguments after Clear returns.
func (r *RocksDB) Clear(key MVCCKey) error {
return dbClear(r.rdb, key)
if len(key.Key) == 0 {
return emptyKeyError()
}
b := r.NewWriteOnlyBatch()
defer b.Close()
if err := b.Clear(key); err != nil {
return err
}
return b.Commit(true /* sync */)
}

// SingleClear removes the most recent item from the db with the given key.
//
// It is safe to modify the contents of the arguments after SingleClear returns.
func (r *RocksDB) SingleClear(key MVCCKey) error {
return dbSingleClear(r.rdb, key)
if len(key.Key) == 0 {
return emptyKeyError()
}
b := r.NewWriteOnlyBatch()
defer b.Close()
if err := b.SingleClear(key); err != nil {
return err
}
return b.Commit(true /* sync */)
}

// ClearRange removes a set of entries, from start (inclusive) to end
// (exclusive).
//
// It is safe to modify the contents of the arguments after ClearRange returns.
func (r *RocksDB) ClearRange(start, end MVCCKey) error {
return dbClearRange(r.rdb, start, end)
b := r.NewWriteOnlyBatch()
defer b.Close()
if err := b.ClearRange(start, end); err != nil {
return err
}
return b.Commit(true /* sync */)
}

// ClearIterRange removes a set of entries, from start (inclusive) to end
Expand All @@ -614,7 +658,15 @@ func (r *RocksDB) ClearRange(start, end MVCCKey) error {
// It is safe to modify the contents of the arguments after ClearIterRange
// returns.
func (r *RocksDB) ClearIterRange(iter Iterator, start, end roachpb.Key) error {
return dbClearIterRange(r.rdb, iter, start, end)
b := r.NewWriteOnlyBatch()
defer b.Close()
err := r.Iterate(start, end, func(keyValue MVCCKeyValue) (bool, error) {
return false, b.Clear(keyValue.Key)
})
if err != nil {
return err
}
return b.Commit(true /* sync */)
}

// Iterate iterates from start to end keys, invoking f on each
Expand Down Expand Up @@ -2616,13 +2668,6 @@ func dbClear(rdb *C.DBEngine, key MVCCKey) error {
return statusToError(C.DBDelete(rdb, goToCKey(key)))
}

func dbSingleClear(rdb *C.DBEngine, key MVCCKey) error {
if len(key.Key) == 0 {
return emptyKeyError()
}
return statusToError(C.DBSingleDelete(rdb, goToCKey(key)))
}

func dbClearRange(rdb *C.DBEngine, start, end MVCCKey) error {
if err := statusToError(C.DBDeleteRange(rdb, goToCKey(start), goToCKey(end))); err != nil {
return err
Expand Down