Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/raft_log_truncator.go
Original file line number Diff line number Diff line change
Expand Up @@ -395,7 +395,7 @@ func (t *raftLogTruncator) durabilityAdvanced(ctx context.Context) {
// durable state after merging
// https://github.com/cockroachdb/pebble/pull/1490 and incorporating into
// CockroachDB.
reader := t.store.getEngine().NewReadOnly()
reader := t.store.getEngine().NewReadOnly(storage.StandardDurability)
defer reader.Close()
for _, rangeID := range ranges {
t.tryEnactTruncations(ctx, rangeID, reader)
Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/rditer/replica_data_iter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ func verifyRDReplicatedOnlyMVCCIter(
) {
t.Helper()
verify := func(t *testing.T, useSpanSet, reverse bool) {
readWriter := eng.NewReadOnly()
readWriter := eng.NewReadOnly(storage.StandardDurability)
defer readWriter.Close()
if useSpanSet {
var spans spanset.SpanSet
Expand Down Expand Up @@ -189,7 +189,7 @@ func verifyRDReplicatedOnlyMVCCIter(
func verifyRDEngineIter(
t *testing.T, desc *roachpb.RangeDescriptor, eng storage.Engine, expectedKeys []storage.MVCCKey,
) {
readWriter := eng.NewReadOnly()
readWriter := eng.NewReadOnly(storage.StandardDurability)
defer readWriter.Close()
iter := NewReplicaEngineDataIterator(desc, readWriter, false)
defer iter.Close()
Expand Down
5 changes: 3 additions & 2 deletions pkg/kv/kvserver/replica_gossip.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness/livenesspb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/uncertainty"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/errors"
)
Expand Down Expand Up @@ -174,7 +175,7 @@ func (r *Replica) MaybeGossipNodeLivenessRaftMuLocked(
ba.Add(&roachpb.ScanRequest{RequestHeader: roachpb.RequestHeaderFromSpan(span)})
// Call evaluateBatch instead of Send to avoid reacquiring latches.
rec := NewReplicaEvalContext(r, todoSpanSet)
rw := r.Engine().NewReadOnly()
rw := r.Engine().NewReadOnly(storage.StandardDurability)
defer rw.Close()

br, result, pErr :=
Expand Down Expand Up @@ -217,7 +218,7 @@ func (r *Replica) loadSystemConfig(ctx context.Context) (*config.SystemConfigEnt
ba.Add(&roachpb.ScanRequest{RequestHeader: roachpb.RequestHeaderFromSpan(keys.SystemConfigSpan)})
// Call evaluateBatch instead of Send to avoid reacquiring latches.
rec := NewReplicaEvalContext(r, todoSpanSet)
rw := r.Engine().NewReadOnly()
rw := r.Engine().NewReadOnly(storage.StandardDurability)
defer rw.Close()

br, result, pErr := evaluateBatch(
Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/replica_raftstorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func (r *replicaRaftStorage) InitialState() (raftpb.HardState, raftpb.ConfState,
// and this method will always return at least one entry even if it exceeds
// maxBytes. Sideloaded proposals count towards maxBytes with their payloads inlined.
func (r *replicaRaftStorage) Entries(lo, hi, maxBytes uint64) ([]raftpb.Entry, error) {
readonly := r.store.Engine().NewReadOnly()
readonly := r.store.Engine().NewReadOnly(storage.StandardDurability)
defer readonly.Close()
ctx := r.AnnotateCtx(context.TODO())
if r.raftMu.sideloaded == nil {
Expand Down Expand Up @@ -280,7 +280,7 @@ func (r *replicaRaftStorage) Term(i uint64) (uint64, error) {
if e, ok := r.store.raftEntryCache.Get(r.RangeID, i); ok {
return e.Term, nil
}
readonly := r.store.Engine().NewReadOnly()
readonly := r.store.Engine().NewReadOnly(storage.StandardDurability)
defer readonly.Close()
ctx := r.AnnotateCtx(context.TODO())
return term(ctx, r.mu.stateLoader, readonly, r.RangeID, r.store.raftEntryCache, i)
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/replica_read.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func (r *Replica) executeReadOnlyBatch(
// TODO(irfansharif): It's unfortunate that in this read-only code path,
// we're stuck with a ReadWriter because of the way evaluateBatch is
// designed.
rw := r.store.Engine().NewReadOnly()
rw := r.store.Engine().NewReadOnly(storage.StandardDurability)
if !rw.ConsistentIterators() {
// This is not currently needed for correctness, but future optimizations
// may start relying on this, so we assert here.
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ func TestReadOnlyBasics(t *testing.T) {
e := engineImpl.create()
defer e.Close()

ro := e.NewReadOnly()
ro := e.NewReadOnly(StandardDurability)
if ro.Closed() {
t.Fatal("read-only is expectedly found to be closed")
}
Expand Down
33 changes: 28 additions & 5 deletions pkg/storage/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -743,6 +743,22 @@ type ReadWriter interface {
Writer
}

// DurabilityRequirement is an advanced option. If in doubt, use
// StandardDurability.
//
// GuranteedDurability maps to pebble.IterOptions.OnlyReadGuaranteedDurable.
// This acknowledges the fact that we do not (without sacrificing correctness)
// sync the WAL for many writes, and there are some advanced cases
// (raftLogTruncator) that need visibility into what is guaranteed durable.
type DurabilityRequirement int8

const (
// StandardDurability is what should normally be used.
StandardDurability DurabilityRequirement = iota
// GuaranteedDurability is an advanced option (only for raftLogTruncator).
GuaranteedDurability
)

// Engine is the interface that wraps the core operations of a key/value store.
type Engine interface {
ReadWriter
Expand Down Expand Up @@ -775,14 +791,15 @@ type Engine interface {
// them atomically on a call to Commit().
NewBatch() Batch
// NewReadOnly returns a new instance of a ReadWriter that wraps this
// engine. This wrapper panics when unexpected operations (e.g., write
// operations) are executed on it and caches iterators to avoid the overhead
// of creating multiple iterators for batched reads.
// engine, and with the given durability requirement. This wrapper panics
// when unexpected operations (e.g., write operations) are executed on it
// and caches iterators to avoid the overhead of creating multiple iterators
// for batched reads.
//
// All iterators created from a read-only engine are guaranteed to provide a
// consistent snapshot of the underlying engine. See the comment on the
// Reader interface and the Reader.ConsistentIterators method.
NewReadOnly() ReadWriter
NewReadOnly(durability DurabilityRequirement) ReadWriter
// NewUnindexedBatch returns a new instance of a batched engine which wraps
// this engine. It is unindexed, in that writes to the batch are not
// visible to reads until after it commits. The batch accumulates all
Expand Down Expand Up @@ -830,7 +847,13 @@ type Engine interface {
// addSSTablePreApply to select alternate code paths, but really there should
// be a unified code path there.
InMem() bool

// RegisterFlushCompletedCallback registers a callback that will be run for
// every successful flush. Only one callback can be registered at a time, so
// registering again replaces the previous callback. The callback must
// return quickly and must not call any methods on the Engine in the context
// of the callback since it could cause a deadlock (since the callback may
// be invoked while holding mutexes).
RegisterFlushCompletedCallback(cb func())
// Filesystem functionality.
fs.FS
// ReadFile reads the content from the file with the given filename int this RocksDB's env.
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -857,7 +857,7 @@ func TestEngineScan1(t *testing.T) {
}

// Test iterator stats.
ro := engine.NewReadOnly()
ro := engine.NewReadOnly(StandardDurability)
iter := ro.NewMVCCIterator(MVCCKeyIterKind,
IterOptions{LowerBound: roachpb.Key("cat"), UpperBound: roachpb.Key("server")})
iter.SeekGE(MVCCKey{Key: roachpb.Key("cat")})
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/intent_interleaving_iter.go
Original file line number Diff line number Diff line change
Expand Up @@ -991,7 +991,7 @@ func (i *intentInterleavingIter) SupportsPrev() bool {
// the identical engine state.
func newMVCCIteratorByCloningEngineIter(iter EngineIterator, opts IterOptions) MVCCIterator {
pIter := iter.GetRawIter()
it := newPebbleIterator(nil, pIter, opts)
it := newPebbleIterator(nil, pIter, opts, StandardDurability)
if iter == nil {
panic("couldn't create a new iterator")
}
Expand Down
6 changes: 4 additions & 2 deletions pkg/storage/intent_reader_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,9 @@ func (idw intentDemuxWriter) ClearMVCCRangeAndIntents(
// code probably uses an MVCCIterator.
type wrappableReader interface {
Reader
rawGet(key []byte) (value []byte, err error)
// rawMVCCGet is only used for Reader.MVCCGet which is deprecated and not
// performance sensitive.
rawMVCCGet(key []byte) (value []byte, err error)
}

// wrapReader wraps the provided reader, to return an implementation of MVCCIterator
Expand All @@ -126,7 +128,7 @@ var intentInterleavingReaderPool = sync.Pool{

// Get implements the Reader interface.
func (imr *intentInterleavingReader) MVCCGet(key MVCCKey) ([]byte, error) {
val, err := imr.wrappableReader.rawGet(EncodeMVCCKey(key))
val, err := imr.wrappableReader.rawMVCCGet(EncodeMVCCKey(key))
if val != nil || err != nil || !key.Timestamp.IsEmpty() {
return val, err
}
Expand Down
Loading