Skip to content

Commit

Permalink
db: enhance Iterator with limited iteration mode
Browse files Browse the repository at this point in the history
Limited iteration mode is a best-effort way for the caller
to specify an exclusive forward or inclusive reverse limit
on each limited-iteration call. The limit lasts only for
the duration of the call. The limit is exclusive for
forward iteration and inclusive for reverse iteration.

Iterator supports WithLimit variants for SeekGE, SeekLT,
Next, Prev. These are motivated by the O(N^2) complexity
we observe when doing coordinated iteration over the
lock table and MVCC keys in CockroachDB where all the
intents in the lock table are deleted but have not yet
been compacted away. The O(N^2) complexity arises in two
cases:
- Limited scans: the extreme is a limited scan with a
  limit of 1. Each successive scan traverses N, N-1, ...
  locks.
- Mix of forward/reverse iteration in a single scan: A
  Next/SeekGE will incur a cost of N and the Prev/SeekLT
  another cost of N.
  This situation could be potentially fixed with a narrower
  solution, given the constrained way that the combination of
  forward and backward iteration is used in CockroachDB
  code, but it would be fragile. The limited iteration mode
  solves it in a more general way.

The implementation of limited iteration eschews
best-effort behavior (though that would reduce some
comparisons) so that it can be deterministic and be
tested via the metamorphic test.

The existing seek optimizations are impacted in the
following way:
- Use next before seeking:
  - Repeated seeks (forward or reverse) with monotonic
    bounds: this optimization operates at mergingIter
    and below and is therefore unaffected by whatever
    Iterator may be doing with the keys exposed by
    mergingIter.
  - monotonic SeekPrefixGE: SeekPrefixGE does not
    expose a limited iteration variant since the
    prefix comparison performed by Iterator already
    ensures the absence of quadratic behavior.
- Noop seeks (forward and reverse): this optimization
  happens purely in Iterator. It is altered to account
  for limited iteration, and forks into 2 paths,
  (a) truly a noop, (b) the limit on the preceding
  seek has positioned the iterator at a key that is
  outside the limit. In case (b) the underlying
  mergingIter does not need to be seeked, but there
  is work to be done in finding an appropriate
  prev or next entry.

See https://github.com/sumeerbhola/cockroach/tree/iter_coord
for the corresponding CockroachDB change that uses
these methods.
  • Loading branch information
sumeerbhola committed Mar 13, 2021
1 parent 958290b commit 881bb69
Show file tree
Hide file tree
Showing 10 changed files with 707 additions and 160 deletions.
44 changes: 42 additions & 2 deletions data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,9 @@ func runIterCmd(d *datadriven.TestData, iter *Iterator, closeIter bool) string {
if len(parts) == 0 {
continue
}
printValidityState := false
var valid bool
var validityState IterValidityState
switch parts[0] {
case "seek-ge":
if len(parts) != 2 {
Expand All @@ -90,6 +92,32 @@ func runIterCmd(d *datadriven.TestData, iter *Iterator, closeIter bool) string {
return "seek-lt <key>\n"
}
valid = iter.SeekLT([]byte(strings.TrimSpace(parts[1])))
case "seek-ge-limit":
if len(parts) != 3 {
return "seek-ge-limit <key> <limit>\n"
}
validityState = iter.SeekGEWithLimit(
[]byte(strings.TrimSpace(parts[1])), []byte(strings.TrimSpace(parts[2])))
printValidityState = true
case "seek-lt-limit":
if len(parts) != 3 {
return "seek-lt-limit <key> <limit>\n"
}
validityState = iter.SeekLTWithLimit(
[]byte(strings.TrimSpace(parts[1])), []byte(strings.TrimSpace(parts[2])))
printValidityState = true
case "next-limit":
if len(parts) != 2 {
return "next-limit <limit>\n"
}
validityState = iter.NextWithLimit([]byte(strings.TrimSpace(parts[1])))
printValidityState = true
case "prev-limit":
if len(parts) != 2 {
return "prev-limit <limit>\n"
}
validityState = iter.PrevWithLimit([]byte(strings.TrimSpace(parts[1])))
printValidityState = true
case "first":
valid = iter.First()
case "last":
Expand Down Expand Up @@ -130,14 +158,26 @@ func runIterCmd(d *datadriven.TestData, iter *Iterator, closeIter bool) string {
default:
return fmt.Sprintf("unknown op: %s", parts[0])
}
var validityStateStr string
if printValidityState {
valid = validityState == IterValid
switch validityState {
case IterExhausted:
validityStateStr = " exhausted"
case IterValid:
validityStateStr = " valid"
case IterAtLimit:
validityStateStr = " at-limit"
}
}
if err := iter.Error(); err != nil {
fmt.Fprintf(&b, "err=%v\n", err)
} else if valid != iter.Valid() {
fmt.Fprintf(&b, "mismatched valid states: %t vs %t\n", valid, iter.Valid())
} else if valid {
fmt.Fprintf(&b, "%s:%s\n", iter.Key(), iter.Value())
fmt.Fprintf(&b, "%s:%s%s\n", iter.Key(), iter.Value(), validityStateStr)
} else {
fmt.Fprintf(&b, ".\n")
fmt.Fprintf(&b, ".%s\n", validityStateStr)
}
}
return b.String()
Expand Down
66 changes: 37 additions & 29 deletions internal/metamorphic/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,13 @@ const (
iterFirst
iterLast
iterNext
iterNextWithLimit
iterPrev
iterPrevWithLimit
iterSeekGE
iterSeekGEWithLimit
iterSeekLT
iterSeekLTWithLimit
iterSeekPrefixGE
iterSetBounds
newBatch
Expand Down Expand Up @@ -56,34 +60,38 @@ type config struct {
var defaultConfig = config{
// dbClose is not in this list since it is deterministically generated once, at the end of the test.
ops: []int{
batchAbort: 5,
batchCommit: 5,
dbCheckpoint: 1,
dbCompact: 1,
dbFlush: 2,
dbRestart: 2,
iterClose: 5,
iterFirst: 100,
iterLast: 100,
iterNext: 100,
iterPrev: 100,
iterSeekGE: 100,
iterSeekLT: 100,
iterSeekPrefixGE: 100,
iterSetBounds: 100,
newBatch: 5,
newIndexedBatch: 5,
newIter: 10,
newIterUsingClone: 5,
newSnapshot: 10,
readerGet: 100,
snapshotClose: 10,
writerApply: 10,
writerDelete: 100,
writerDeleteRange: 50,
writerIngest: 100,
writerMerge: 100,
writerSet: 100,
writerSingleDelete: 25,
batchAbort: 5,
batchCommit: 5,
dbCheckpoint: 1,
dbCompact: 1,
dbFlush: 2,
dbRestart: 2,
iterClose: 5,
iterFirst: 100,
iterLast: 100,
iterNext: 100,
iterNextWithLimit: 20,
iterPrev: 100,
iterPrevWithLimit: 20,
iterSeekGE: 100,
iterSeekGEWithLimit: 20,
iterSeekLT: 100,
iterSeekLTWithLimit: 20,
iterSeekPrefixGE: 100,
iterSetBounds: 100,
newBatch: 5,
newIndexedBatch: 5,
newIter: 10,
newIterUsingClone: 5,
newSnapshot: 10,
readerGet: 100,
snapshotClose: 10,
writerApply: 10,
writerDelete: 100,
writerDeleteRange: 50,
writerIngest: 100,
writerMerge: 100,
writerSet: 100,
writerSingleDelete: 25,
},
}
116 changes: 87 additions & 29 deletions internal/metamorphic/generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,35 +93,39 @@ func generate(rng *rand.Rand, count uint64, cfg config) []op {
g := newGenerator(rng)

generators := []func(){
batchAbort: g.batchAbort,
batchCommit: g.batchCommit,
dbCheckpoint: g.dbCheckpoint,
dbCompact: g.dbCompact,
dbFlush: g.dbFlush,
dbRestart: g.dbRestart,
iterClose: g.iterClose,
iterFirst: g.iterFirst,
iterLast: g.iterLast,
iterNext: g.iterNext,
iterPrev: g.iterPrev,
iterSeekGE: g.iterSeekGE,
iterSeekLT: g.iterSeekLT,
iterSeekPrefixGE: g.iterSeekPrefixGE,
iterSetBounds: g.iterSetBounds,
newBatch: g.newBatch,
newIndexedBatch: g.newIndexedBatch,
newIter: g.newIter,
newIterUsingClone: g.newIterUsingClone,
newSnapshot: g.newSnapshot,
readerGet: g.readerGet,
snapshotClose: g.snapshotClose,
writerApply: g.writerApply,
writerDelete: g.writerDelete,
writerDeleteRange: g.writerDeleteRange,
writerIngest: g.writerIngest,
writerMerge: g.writerMerge,
writerSet: g.writerSet,
writerSingleDelete: g.writerSingleDelete,
batchAbort: g.batchAbort,
batchCommit: g.batchCommit,
dbCheckpoint: g.dbCheckpoint,
dbCompact: g.dbCompact,
dbFlush: g.dbFlush,
dbRestart: g.dbRestart,
iterClose: g.iterClose,
iterFirst: g.iterFirst,
iterLast: g.iterLast,
iterNext: g.iterNext,
iterNextWithLimit: g.iterNextWithLimit,
iterPrev: g.iterPrev,
iterPrevWithLimit: g.iterPrevWithLimit,
iterSeekGE: g.iterSeekGE,
iterSeekGEWithLimit: g.iterSeekGEWithLimit,
iterSeekLT: g.iterSeekLT,
iterSeekLTWithLimit: g.iterSeekLTWithLimit,
iterSeekPrefixGE: g.iterSeekPrefixGE,
iterSetBounds: g.iterSetBounds,
newBatch: g.newBatch,
newIndexedBatch: g.newIndexedBatch,
newIter: g.newIter,
newIterUsingClone: g.newIterUsingClone,
newSnapshot: g.newSnapshot,
readerGet: g.readerGet,
snapshotClose: g.snapshotClose,
writerApply: g.writerApply,
writerDelete: g.writerDelete,
writerDeleteRange: g.writerDeleteRange,
writerIngest: g.writerIngest,
writerMerge: g.writerMerge,
writerSet: g.writerSet,
writerSingleDelete: g.writerSingleDelete,
}

// TPCC-style deck of cards randomization. Every time the end of the deck is
Expand Down Expand Up @@ -548,6 +552,22 @@ func (g *generator) iterSeekGE() {
})
}

func (g *generator) iterSeekGEWithLimit() {
if len(g.liveIters) == 0 {
return
}
// 0.1% new keys
key, limit := g.randKey(0.001), g.randKey(0.001)
if bytes.Compare(key, limit) > 0 {
key, limit = limit, key
}
g.add(&iterSeekGEOp{
iterID: g.liveIters.rand(g.rng),
key: key,
limit: limit,
})
}

func (g *generator) iterSeekPrefixGE() {
if len(g.liveIters) == 0 {
return
Expand All @@ -570,6 +590,22 @@ func (g *generator) iterSeekLT() {
})
}

func (g *generator) iterSeekLTWithLimit() {
if len(g.liveIters) == 0 {
return
}
// 0.1% new keys
key, limit := g.randKey(0.001), g.randKey(0.001)
if bytes.Compare(limit, key) > 0 {
key, limit = limit, key
}
g.add(&iterSeekLTOp{
iterID: g.liveIters.rand(g.rng),
key: key,
limit: limit,
})
}

func (g *generator) iterFirst() {
if len(g.liveIters) == 0 {
return
Expand Down Expand Up @@ -600,6 +636,17 @@ func (g *generator) iterNext() {
})
}

func (g *generator) iterNextWithLimit() {
if len(g.liveIters) == 0 {
return
}

g.add(&iterNextOp{
iterID: g.liveIters.rand(g.rng),
limit: g.randKey(0.001), // 0.1% new keys
})
}

func (g *generator) iterPrev() {
if len(g.liveIters) == 0 {
return
Expand All @@ -610,6 +657,17 @@ func (g *generator) iterPrev() {
})
}

func (g *generator) iterPrevWithLimit() {
if len(g.liveIters) == 0 {
return
}

g.add(&iterPrevOp{
iterID: g.liveIters.rand(g.rng),
limit: g.randKey(0.001), // 0.1% new keys
})
}

func (g *generator) readerGet() {
if len(g.liveReaders) == 0 {
return
Expand Down
9 changes: 8 additions & 1 deletion internal/metamorphic/meta_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,14 @@ var (
"the directory storing test state")
disk = flag.Bool("disk", false,
"whether to use an in-mem DB or on-disk (in-mem is significantly faster)")
// TODO: default error rate to a non-zero value
// TODO: default error rate to a non-zero value. Currently, retrying is
// non-deterministic because of the Ierator.*WithLimit() methods since
// they may say that the Iterator is not valid, but be positioned at a
// certain key that can be returned in the future if the limit is changed.
// Since that key is hidden from clients of Iterator, the retryableIter
// using SeekGE will not necessarily position the Iterator that saw an
// injected error at the same place as an Iterator that did not see that
// error.
errorRate = flag.Float64("error-rate", 0.0,
"rate of errors injected into filesystem operations (0 ≤ r < 1)")
failRE = flag.String("fail", "",
Expand Down
Loading

0 comments on commit 881bb69

Please sign in to comment.