Skip to content

Commit

Permalink
storage: Use batches for direct RocksDB mutations
Browse files Browse the repository at this point in the history
Currently, doing direct mutations on a RocksDB instance bypasses
custom batching / syncing logic that we've built on top of it.
This, or something internal to RocksDB, started leading to some bugs
when all direct mutations started passing in WriteOptions.sync = true
(see cockroachdb#55240 for when this change went in).

In this change, direct mutations still commit the batch with sync=true
to guarantee WAL syncing, but they go through the batch commit pipeline
too, just like the vast majority of operations already do.

Fixes cockroachdb#55362.

Release note: None.
  • Loading branch information
itsbilal committed Oct 19, 2020
1 parent d6de3ec commit dae2f19
Showing 1 changed file with 45 additions and 7 deletions.
52 changes: 45 additions & 7 deletions pkg/storage/rocksdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -508,7 +508,12 @@ func (r *RocksDB) Attrs() roachpb.Attributes {
//
// It is safe to modify the contents of the arguments after Put returns.
func (r *RocksDB) Put(key MVCCKey, value []byte) error {
return dbPut(r.rdb, key, value)
b := r.NewWriteOnlyBatch()
defer b.Close()
if err := b.Put(key, value); err != nil {
return err
}
return b.Commit(true)
}

// Merge implements the RocksDB merge operator using the function goMergeInit
Expand All @@ -519,7 +524,12 @@ func (r *RocksDB) Put(key MVCCKey, value []byte) error {
//
// It is safe to modify the contents of the arguments after Merge returns.
func (r *RocksDB) Merge(key MVCCKey, value []byte) error {
return dbMerge(r.rdb, key, value)
b := r.NewWriteOnlyBatch()
defer b.Close()
if err := b.Merge(key, value); err != nil {
return err
}
return b.Commit(true)
}

// LogData is part of the Writer interface.
Expand All @@ -541,7 +551,12 @@ func (r *RocksDB) LogLogicalOp(op MVCCLogicalOpType, details MVCCLogicalOpDetail
// It is safe to modify the contents of the arguments after ApplyBatchRepr
// returns.
func (r *RocksDB) ApplyBatchRepr(repr []byte, sync bool) error {
return dbApplyBatchRepr(r.rdb, repr, sync)
b := r.NewWriteOnlyBatch()
defer b.Close()
if err := b.ApplyBatchRepr(repr, sync); err != nil {
return err
}
return b.Commit(sync)
}

// Get returns the value for the given key.
Expand All @@ -560,22 +575,37 @@ func (r *RocksDB) GetProto(
//
// It is safe to modify the contents of the arguments after Clear returns.
func (r *RocksDB) Clear(key MVCCKey) error {
return dbClear(r.rdb, key)
b := r.NewWriteOnlyBatch()
defer b.Close()
if err := b.Clear(key); err != nil {
return err
}
return b.Commit(true)
}

// SingleClear removes the most recent item from the db with the given key.
//
// It is safe to modify the contents of the arguments after SingleClear returns.
func (r *RocksDB) SingleClear(key MVCCKey) error {
return dbSingleClear(r.rdb, key)
b := r.NewWriteOnlyBatch()
defer b.Close()
if err := b.SingleClear(key); err != nil {
return err
}
return b.Commit(true)
}

// ClearRange removes a set of entries, from start (inclusive) to end
// (exclusive).
//
// It is safe to modify the contents of the arguments after ClearRange returns.
func (r *RocksDB) ClearRange(start, end MVCCKey) error {
return dbClearRange(r.rdb, start, end)
b := r.NewWriteOnlyBatch()
defer b.Close()
if err := b.ClearRange(start, end); err != nil {
return err
}
return b.Commit(true)
}

// ClearIterRange removes a set of entries, from start (inclusive) to end
Expand All @@ -584,7 +614,15 @@ func (r *RocksDB) ClearRange(start, end MVCCKey) error {
// It is safe to modify the contents of the arguments after ClearIterRange
// returns.
func (r *RocksDB) ClearIterRange(iter Iterator, start, end roachpb.Key) error {
return dbClearIterRange(r.rdb, iter, start, end)
b := r.NewWriteOnlyBatch()
defer b.Close()
err := r.Iterate(start, end, func(keyValue MVCCKeyValue) error {
return b.Clear(keyValue.Key)
})
if err != nil {
return err
}
return b.Commit(true)
}

// Iterate iterates from start to end keys, invoking f on each
Expand Down

0 comments on commit dae2f19

Please sign in to comment.