Skip to content

Commit

Permalink
Merge #55871 #55972
Browse files Browse the repository at this point in the history
55871: kvserver: remove the replica.mu.draining field r=andreimatei a=andreimatei

It was only used in one rando place. Generally everybody looks at the
store's draing status.

Release note: None

55972: storage: rename Iterator to MVCCIterator r=sumeerbhola a=sumeerbhola


Also remove MVCCIterator, which was implemented for RocksDB.

Release note: None

Co-authored-by: Andrei Matei <andrei@cockroachlabs.com>
Co-authored-by: sumeerbhola <sumeer@cockroachlabs.com>
  • Loading branch information
3 people committed Oct 26, 2020
3 parents 4a58b36 + af594d8 + 4fd71e4 commit 753ab8b
Show file tree
Hide file tree
Showing 35 changed files with 209 additions and 291 deletions.
6 changes: 3 additions & 3 deletions pkg/ccl/storageccl/engineccl/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ func loadTestData(
func runIterate(
b *testing.B,
loadFactor float32,
makeIterator func(storage.Engine, hlc.Timestamp, hlc.Timestamp) storage.Iterator,
makeIterator func(storage.Engine, hlc.Timestamp, hlc.Timestamp) storage.MVCCIterator,
) {
const numKeys = 100000
const numBatches = 100
Expand Down Expand Up @@ -173,12 +173,12 @@ func BenchmarkTimeBoundIterate(b *testing.B) {
for _, loadFactor := range []float32{1.0, 0.5, 0.1, 0.05, 0.0} {
b.Run(fmt.Sprintf("LoadFactor=%.2f", loadFactor), func(b *testing.B) {
b.Run("NormalIterator", func(b *testing.B) {
runIterate(b, loadFactor, func(e storage.Engine, _, _ hlc.Timestamp) storage.Iterator {
runIterate(b, loadFactor, func(e storage.Engine, _, _ hlc.Timestamp) storage.MVCCIterator {
return e.NewIterator(storage.IterOptions{UpperBound: roachpb.KeyMax})
})
})
b.Run("TimeBoundIterator", func(b *testing.B) {
runIterate(b, loadFactor, func(e storage.Engine, startTime, endTime hlc.Timestamp) storage.Iterator {
runIterate(b, loadFactor, func(e storage.Engine, startTime, endTime hlc.Timestamp) storage.MVCCIterator {
return e.NewIterator(storage.IterOptions{
MinTimestampHint: startTime,
MaxTimestampHint: endTime,
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/storageccl/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ func evalImport(ctx context.Context, cArgs batcheval.CommandArgs) (*roachpb.Impo
}
defer cArgs.EvalCtx.GetLimiters().ConcurrentImportRequests.Finish()

var iters []storage.SimpleIterator
var iters []storage.SimpleMVCCIterator
for _, file := range args.Files {
log.VEventf(ctx, 2, "import file %s %s", file.Path, args.Key)

Expand Down
4 changes: 2 additions & 2 deletions pkg/ccl/storageccl/writebatch.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,10 +112,10 @@ func clearExistingData(
}

log.Eventf(ctx, "target key range not empty, will clear existing data: %+v", existingStats)
// If this is a Iterator, we have to unwrap it because
// If this is a MVCCIterator, we have to unwrap it because
// ClearIterRange needs a plain rocksdb iterator (and can't unwrap
// it itself because of import cycles).
if ssi, ok := iter.(*spanset.Iterator); ok {
if ssi, ok := iter.(*spanset.MVCCIterator); ok {
iter = ssi.Iterator()
}
// TODO(dan): Ideally, this would use `batch.ClearRange` but it doesn't
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/bulk/sst_batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -476,7 +476,7 @@ func createSplitSSTable(
db SSTSender,
start, splitKey roachpb.Key,
disallowShadowing bool,
iter storage.SimpleIterator,
iter storage.SimpleMVCCIterator,
settings *cluster.Settings,
) (*sstSpan, *sstSpan, error) {
sstFile := &storage.MemFile{}
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/batch_spanset_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ func TestSpanSetBatchBoundaries(t *testing.T) {
iter := batch.NewIterator(storage.IterOptions{UpperBound: roachpb.KeyMax})
defer iter.Close()

// Iterators check boundaries on seek and next/prev
// MVCCIterators check boundaries on seek and next/prev
iter.SeekGE(outsideKey)
if _, err := iter.Valid(); !isReadSpanErr(err) {
t.Fatalf("Seek: unexpected error %v", err)
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/batcheval/intent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ type instrumentedEngine struct {
// ... can be extended ...
}

func (ie *instrumentedEngine) NewIterator(opts storage.IterOptions) storage.Iterator {
func (ie *instrumentedEngine) NewIterator(opts storage.IterOptions) storage.MVCCIterator {
if ie.onNewIterator != nil {
ie.onNewIterator(opts)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/below_raft_protos_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ var belowRaftGoldenProtos = map[reflect.Type]fixture{

func init() {
// These are marshaled below Raft by the Pebble merge operator. The Pebble
// merge operator can be called below Raft whenever a Pebble Iterator is
// merge operator can be called below Raft whenever a Pebble MVCCIterator is
// used.
belowRaftGoldenProtos[reflect.TypeOf(&roachpb.InternalTimeSeriesData{})] = fixture{
populatedConstructor: func(r *rand.Rand) protoutil.Message {
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/rangefeed/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ func NewProcessor(cfg Config) *Processor {

// IteratorConstructor is used to construct an iterator. It should be called
// from underneath a stopper task to ensure that the engine has not been closed.
type IteratorConstructor func() storage.SimpleIterator
type IteratorConstructor func() storage.SimpleMVCCIterator

// Start launches a goroutine to process rangefeed events and send them to
// registrations.
Expand Down
8 changes: 4 additions & 4 deletions pkg/kv/kvserver/rangefeed/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ func rangeFeedCheckpoint(span roachpb.Span, ts hlc.Timestamp) *roachpb.RangeFeed
const testProcessorEventCCap = 16

func newTestProcessorWithTxnPusher(
rtsIter storage.SimpleIterator, txnPusher TxnPusher,
rtsIter storage.SimpleMVCCIterator, txnPusher TxnPusher,
) (*Processor, *stop.Stopper) {
stopper := stop.NewStopper()

Expand All @@ -158,14 +158,14 @@ func newTestProcessorWithTxnPusher(
return p, stopper
}

func makeIteratorConstructor(rtsIter storage.SimpleIterator) IteratorConstructor {
func makeIteratorConstructor(rtsIter storage.SimpleMVCCIterator) IteratorConstructor {
if rtsIter == nil {
return nil
}
return func() storage.SimpleIterator { return rtsIter }
return func() storage.SimpleMVCCIterator { return rtsIter }
}

func newTestProcessor(rtsIter storage.SimpleIterator) (*Processor, *stop.Stopper) {
func newTestProcessor(rtsIter storage.SimpleMVCCIterator) (*Processor, *stop.Stopper) {
return newTestProcessorWithTxnPusher(rtsIter, nil /* pusher */)
}

Expand Down
6 changes: 3 additions & 3 deletions pkg/kv/kvserver/rangefeed/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ type registration struct {
// Input.
span roachpb.Span
catchupTimestamp hlc.Timestamp
catchupIterConstructor func() storage.SimpleIterator
catchupIterConstructor func() storage.SimpleMVCCIterator
withDiff bool
metrics *Metrics

Expand Down Expand Up @@ -86,7 +86,7 @@ type registration struct {
func newRegistration(
span roachpb.Span,
startTS hlc.Timestamp,
catchupIterConstructor func() storage.SimpleIterator,
catchupIterConstructor func() storage.SimpleMVCCIterator,
withDiff bool,
bufferSz int,
metrics *Metrics,
Expand Down Expand Up @@ -295,7 +295,7 @@ func (r *registration) maybeRunCatchupScan() error {
startKey := storage.MakeMVCCMetadataKey(r.span.Key)
endKey := storage.MakeMVCCMetadataKey(r.span.EndKey)

// Iterator will encounter historical values for each key in
// MVCCIterator will encounter historical values for each key in
// reverse-chronological order. To output in chronological order, store
// events for the same key until a different key is encountered, then output
// the encountered values in reverse. This also allows us to buffer events
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/rangefeed/registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ type testRegistration struct {
}

func newTestRegistration(
span roachpb.Span, ts hlc.Timestamp, catchup storage.SimpleIterator, withDiff bool,
span roachpb.Span, ts hlc.Timestamp, catchup storage.SimpleMVCCIterator, withDiff bool,
) *testRegistration {
s := newTestStream()
errC := make(chan *roachpb.Error, 1)
Expand Down
8 changes: 4 additions & 4 deletions pkg/kv/kvserver/rangefeed/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,19 +37,19 @@ type runnable interface {
// The Processor can initialize its resolvedTimestamp once the scan completes
// because it knows it is now tracking all intents in its key range.
//
// Iterator Contract:
// The provided Iterator must observe all intents in the Processor's keyspan.
// MVCCIterator Contract:
// The provided MVCCIterator must observe all intents in the Processor's keyspan.
// An important implication of this is that if the iterator is a
// TimeBoundIterator, its MinTimestamp cannot be above the keyspan's largest
// known resolved timestamp, if one has ever been recorded. If one has never
// been recorded, the TimeBoundIterator cannot have any lower bound.
//
type initResolvedTSScan struct {
p *Processor
it storage.SimpleIterator
it storage.SimpleMVCCIterator
}

func newInitResolvedTSScan(p *Processor, it storage.SimpleIterator) runnable {
func newInitResolvedTSScan(p *Processor, it storage.SimpleMVCCIterator) runnable {
return &initResolvedTSScan{p: p, it: it}
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/rditer/replica_data_iter.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ type KeyRange struct {
// The ranges keyRange slice specifies the key ranges which comprise
// all of the range's data.
//
// A ReplicaDataIterator provides a subset of the engine.Iterator interface.
// A ReplicaDataIterator provides a subset of the engine.MVCCIterator interface.
//
// TODO(tschottdorf): the API is awkward. By default, ReplicaDataIterator uses
// a byte allocator which needs to be reset manually using `ResetAllocator`.
Expand All @@ -37,7 +37,7 @@ type KeyRange struct {
type ReplicaDataIterator struct {
curIndex int
ranges []KeyRange
it storage.Iterator
it storage.MVCCIterator
a bufalloc.ByteAllocator
}

Expand Down
7 changes: 1 addition & 6 deletions pkg/kv/kvserver/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -483,11 +483,6 @@ type Replica struct {
// depending on which lock is being held.
stateLoader stateloader.StateLoader

// draining specifies whether this replica is draining. Raft leadership
// transfers due to a lease change will be attempted even if the target does
// not have all the log entries.
draining bool

// cachedProtectedTS provides the state of the protected timestamp
// subsystem as used on the request serving path to determine the effective
// gc threshold given the current TTL when using strict GC enforcement.
Expand Down Expand Up @@ -1606,7 +1601,7 @@ func (r *Replica) maybeTransferRaftLeadershipToLeaseholderLocked(ctx context.Con
}
lhReplicaID := uint64(lease.Replica.ReplicaID)
lhProgress, ok := raftStatus.Progress[lhReplicaID]
if (ok && lhProgress.Match >= raftStatus.Commit) || r.mu.draining {
if (ok && lhProgress.Match >= raftStatus.Commit) || r.store.IsDraining() {
log.VEventf(ctx, 1, "transferring raft leadership to replica ID %v", lhReplicaID)
r.store.metrics.RangeRaftLeaderTransfers.Inc(1)
r.mu.internalRaftGroup.TransferLeader(lhReplicaID)
Expand Down
12 changes: 6 additions & 6 deletions pkg/kv/kvserver/replica_rangefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,12 +112,12 @@ func (tp *rangefeedTxnPusher) ResolveIntents(
}

type iteratorWithCloser struct {
storage.SimpleIterator
storage.SimpleMVCCIterator
close func()
}

func (i iteratorWithCloser) Close() {
i.SimpleIterator.Close()
i.SimpleMVCCIterator.Close()
i.close()
}

Expand Down Expand Up @@ -199,7 +199,7 @@ func (r *Replica) RangeFeed(
// Register the stream with a catch-up iterator.
var catchUpIterFunc rangefeed.IteratorConstructor
if usingCatchupIter {
catchUpIterFunc = func() storage.SimpleIterator {
catchUpIterFunc = func() storage.SimpleMVCCIterator {

innerIter := r.Engine().NewIterator(storage.IterOptions{
UpperBound: args.Span.EndKey,
Expand All @@ -213,8 +213,8 @@ func (r *Replica) RangeFeed(
// MinTimestampHint: args.Timestamp,
})
catchUpIter := iteratorWithCloser{
SimpleIterator: innerIter,
close: iterSemRelease,
SimpleMVCCIterator: innerIter,
close: iterSemRelease,
}
return catchUpIter
}
Expand Down Expand Up @@ -346,7 +346,7 @@ func (r *Replica) registerWithRangefeedRaftMuLocked(
p = rangefeed.NewProcessor(cfg)

// Start it with an iterator to initialize the resolved timestamp.
rtsIter := func() storage.SimpleIterator {
rtsIter := func() storage.SimpleMVCCIterator {
return r.Engine().NewIterator(storage.IterOptions{
UpperBound: desc.EndKey.AsRawKey(),
// TODO(nvanbenschoten): To facilitate fast restarts of rangefeed
Expand Down
Loading

0 comments on commit 753ab8b

Please sign in to comment.