Skip to content

Commit

Permalink
base: add SeqNum type
Browse files Browse the repository at this point in the history
We add a `SeqNum` type for sequence numbers. This makes the code more
self-documenting, and will allow a custom `String` implementation.
  • Loading branch information
RaduBerinde committed Jun 22, 2024
1 parent 90d691e commit ff83363
Show file tree
Hide file tree
Showing 85 changed files with 465 additions and 427 deletions.
28 changes: 14 additions & 14 deletions batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,7 @@ type batchInternal struct {
// tombstonesSeqNum. This is the case for all new iterators created over a
// batch but it's not the case for all cloned iterators.
tombstones []keyspan.Span
tombstonesSeqNum uint64
tombstonesSeqNum base.SeqNum

// Fragmented range key spans. Cached the first time a range key iterator is
// requested. The cache is invalidated whenever a new range key
Expand All @@ -326,7 +326,7 @@ type batchInternal struct {
// tombstonesSeqNum. This is the case for all new iterators created over a
// batch but it's not the case for all cloned iterators.
rangeKeys []keyspan.Span
rangeKeysSeqNum uint64
rangeKeysSeqNum base.SeqNum

// The flushableBatch wrapper if the batch is too large to fit in the
// memtable.
Expand Down Expand Up @@ -487,8 +487,8 @@ func newIndexedBatchWithSize(db *DB, comparer *Comparer, size int) *Batch {
// with the base.InternalKeySeqNumBatch bit. These sequence numbers are only
// used during iteration, and the keys are assigned ordinary sequence numbers
// when the batch is committed.
func (b *Batch) nextSeqNum() uint64 {
return uint64(len(b.data)) | base.InternalKeySeqNumBatch
func (b *Batch) nextSeqNum() base.SeqNum {
return base.SeqNum(len(b.data)) | base.InternalKeySeqNumBatch
}

func (b *Batch) release() {
Expand Down Expand Up @@ -1310,15 +1310,15 @@ func (b *Batch) initInternalIter(o *IterOptions, iter *batchIter) {
}
}

func (b *Batch) newRangeDelIter(o *IterOptions, batchSnapshot uint64) *keyspan.Iter {
func (b *Batch) newRangeDelIter(o *IterOptions, batchSnapshot base.SeqNum) *keyspan.Iter {
// Construct an iterator even if rangeDelIndex is nil, because it is allowed
// to refresh later, so we need the container to exist.
iter := new(keyspan.Iter)
b.initRangeDelIter(o, iter, batchSnapshot)
return iter
}

func (b *Batch) initRangeDelIter(_ *IterOptions, iter *keyspan.Iter, batchSnapshot uint64) {
func (b *Batch) initRangeDelIter(_ *IterOptions, iter *keyspan.Iter, batchSnapshot base.SeqNum) {
if b.rangeDelIndex == nil {
iter.Init(b.cmp, nil)
return
Expand Down Expand Up @@ -1386,15 +1386,15 @@ func fragmentRangeDels(frag *keyspan.Fragmenter, it internalIterator, count int)
frag.Finish()
}

func (b *Batch) newRangeKeyIter(o *IterOptions, batchSnapshot uint64) *keyspan.Iter {
func (b *Batch) newRangeKeyIter(o *IterOptions, batchSnapshot base.SeqNum) *keyspan.Iter {
// Construct an iterator even if rangeKeyIndex is nil, because it is allowed
// to refresh later, so we need the container to exist.
iter := new(keyspan.Iter)
b.initRangeKeyIter(o, iter, batchSnapshot)
return iter
}

func (b *Batch) initRangeKeyIter(_ *IterOptions, iter *keyspan.Iter, batchSnapshot uint64) {
func (b *Batch) initRangeKeyIter(_ *IterOptions, iter *keyspan.Iter, batchSnapshot base.SeqNum) {
if b.rangeKeyIndex == nil {
iter.Init(b.cmp, nil)
return
Expand Down Expand Up @@ -1596,14 +1596,14 @@ func (b *Batch) grow(n int) {
b.data = b.data[:newSize]
}

func (b *Batch) setSeqNum(seqNum uint64) {
func (b *Batch) setSeqNum(seqNum base.SeqNum) {
batchrepr.SetSeqNum(b.data, seqNum)
}

// SeqNum returns the batch sequence number which is applied to the first
// record in the batch. The sequence number is incremented for each subsequent
// record. It returns zero if the batch is empty.
func (b *Batch) SeqNum() uint64 {
func (b *Batch) SeqNum() base.SeqNum {
if len(b.data) == 0 {
b.init(batchrepr.HeaderLen)
}
Expand Down Expand Up @@ -1666,7 +1666,7 @@ type batchIter struct {
// read. This sequence number has the InternalKeySeqNumBatch bit set, so it
// encodes an offset within the batch. Only batch entries earlier than the
// offset are visible during iteration.
snapshot uint64
snapshot base.SeqNum
}

// batchIter implements the base.InternalIterator interface.
Expand Down Expand Up @@ -1849,7 +1849,7 @@ type flushableBatch struct {

// The base sequence number for the entries in the batch. This is the same
// value as Batch.seqNum() and is cached here for performance.
seqNum uint64
seqNum base.SeqNum

// A slice of offsets and indices for the entries in the batch. Used to
// implement flushableBatchIter. Unlike the indexing on a normal batch, a
Expand Down Expand Up @@ -1997,7 +1997,7 @@ func newFlushableBatch(batch *Batch, comparer *Comparer) (*flushableBatch, error
return b, nil
}

func (b *flushableBatch) setSeqNum(seqNum uint64) {
func (b *flushableBatch) setSeqNum(seqNum base.SeqNum) {
if b.seqNum != 0 {
panic(fmt.Sprintf("pebble: flushableBatch.seqNum already set: %d", b.seqNum))
}
Expand Down Expand Up @@ -2266,7 +2266,7 @@ func (i *flushableBatchIter) getKey(index int) InternalKey {
e := &i.offsets[index]
kind := InternalKeyKind(i.data[e.offset])
key := i.data[e.keyStart:e.keyEnd]
return base.MakeInternalKey(key, i.batch.seqNum+uint64(e.index), kind)
return base.MakeInternalKey(key, i.batch.seqNum+base.SeqNum(e.index), kind)
}

func (i *flushableBatchIter) getKV(index int) *base.InternalKV {
Expand Down
12 changes: 4 additions & 8 deletions batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"io"
"math"
"math/rand"
"strconv"
"strings"
"sync"
"testing"
Expand Down Expand Up @@ -292,7 +291,7 @@ func testBatchEmpty(t *testing.T, size int, opts ...BatchOption) {
require.True(t, b.Empty())
b = newBatchWithSize(nil, size)

require.Equal(t, uint64(0), b.SeqNum())
require.Equal(t, base.SeqNumZero, b.SeqNum())
require.True(t, b.Empty())
b.Reset()
require.True(t, b.Empty())
Expand Down Expand Up @@ -396,7 +395,7 @@ func TestBatchReset(t *testing.T) {
require.Equal(t, uint64(0), b.countRangeDels)
require.Equal(t, uint64(0), b.countRangeKeys)
require.Equal(t, batchrepr.HeaderLen, len(b.data))
require.Equal(t, uint64(0), b.SeqNum())
require.Equal(t, base.SeqNumZero, b.SeqNum())
require.Equal(t, uint64(0), b.memTableSize)
require.Equal(t, FormatMajorVersion(0x00), b.minimumFormatMajorVersion)
require.Equal(t, b.deferredOp, DeferredBatchOp{})
Expand Down Expand Up @@ -1188,11 +1187,8 @@ func TestFlushableBatch(t *testing.T) {
if len(d.CmdArgs) != 1 || len(d.CmdArgs[0].Vals) != 1 || d.CmdArgs[0].Key != "seq" {
return "dump seq=<value>\n"
}
seqNum, err := strconv.Atoi(d.CmdArgs[0].Vals[0])
if err != nil {
return err.Error()
}
b.setSeqNum(uint64(seqNum))
seqNum := base.ParseSeqNum(d.CmdArgs[0].Vals[0])
b.setSeqNum(seqNum)

var buf bytes.Buffer

Expand Down
6 changes: 3 additions & 3 deletions batchrepr/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func ReadHeader(repr []byte) (h Header, ok bool) {
type Header struct {
// SeqNum is the sequence number at which the batch is committed. A batch
// that has not yet committed will have a zero sequence number.
SeqNum uint64
SeqNum base.SeqNum
// Count is the count of keys written to the batch.
Count uint32
}
Expand All @@ -62,8 +62,8 @@ func (h Header) String() string {
// does not validate that the repr is valid. It's exported only for very
// performance sensitive code paths that should not necessarily read the rest of
// the header as well.
func ReadSeqNum(repr []byte) uint64 {
return binary.LittleEndian.Uint64(repr[:countOffset])
func ReadSeqNum(repr []byte) base.SeqNum {
return base.SeqNum(binary.LittleEndian.Uint64(repr[:countOffset]))
}

// Read constructs a Reader from an encoded batch representation, ignoring the
Expand Down
10 changes: 7 additions & 3 deletions batchrepr/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,17 @@

package batchrepr

import "encoding/binary"
import (
"encoding/binary"

"github.com/cockroachdb/pebble/internal/base"
)

// SetSeqNum mutates the provided batch representation, storing the provided
// sequence number in its header. The provided byte slice must already be at
// least HeaderLen bytes long or else SetSeqNum will panic.
func SetSeqNum(repr []byte, seqNum uint64) {
binary.LittleEndian.PutUint64(repr[:countOffset], seqNum)
func SetSeqNum(repr []byte, seqNum base.SeqNum) {
binary.LittleEndian.PutUint64(repr[:countOffset], uint64(seqNum))
}

// SetCount mutates the provided batch representation, storing the provided
Expand Down
6 changes: 2 additions & 4 deletions batchrepr/writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"testing"

"github.com/cockroachdb/datadriven"
"github.com/cockroachdb/pebble/internal/base"
"github.com/cockroachdb/pebble/internal/binfmt"
)

Expand Down Expand Up @@ -37,10 +38,7 @@ func TestWriter(t *testing.T) {
return prettyBinaryRepr(repr)

case "set-seqnum":
seqNum, err := strconv.ParseUint(td.CmdArgs[0].Key, 10, 64)
if err != nil {
return err.Error()
}
seqNum := base.ParseSeqNum(td.CmdArgs[0].Key)
SetSeqNum(repr, seqNum)
return prettyBinaryRepr(repr)

Expand Down
13 changes: 7 additions & 6 deletions commit.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"time"

"github.com/cockroachdb/pebble/batchrepr"
"github.com/cockroachdb/pebble/internal/base"
"github.com/cockroachdb/pebble/record"
)

Expand Down Expand Up @@ -126,10 +127,10 @@ func (q *commitQueue) dequeueApplied() *Batch {
type commitEnv struct {
// The next sequence number to give to a batch. Protected by
// commitPipeline.mu.
logSeqNum *atomic.Uint64
logSeqNum *base.AtomicSeqNum
// The visible sequence number at which reads should be performed. Ratcheted
// upwards atomically as batches are applied to the memtable.
visibleSeqNum *atomic.Uint64
visibleSeqNum *base.AtomicSeqNum

// Apply the batch to the specified memtable. Called concurrently.
apply func(b *Batch, mem *memTable) error
Expand Down Expand Up @@ -360,7 +361,7 @@ func (p *commitPipeline) Commit(b *Batch, syncWAL bool, noSyncWait bool) error {
// invoked with commitPipeline.mu held, but note that DB.mu is not held and
// must be locked if necessary.
func (p *commitPipeline) AllocateSeqNum(
count int, prepare func(seqNum uint64), apply func(seqNum uint64),
count int, prepare func(seqNum base.SeqNum), apply func(seqNum base.SeqNum),
) {
// This method is similar to Commit and prepare. Be careful about trying to
// share additional code with those methods because Commit and prepare are
Expand All @@ -387,7 +388,7 @@ func (p *commitPipeline) AllocateSeqNum(
// Assign the batch a sequence number. Note that we use atomic operations
// here to handle concurrent reads of logSeqNum. commitPipeline.mu provides
// mutual exclusion for other goroutines writing to logSeqNum.
logSeqNum := p.env.logSeqNum.Add(uint64(count)) - uint64(count)
logSeqNum := p.env.logSeqNum.Add(base.SeqNum(count)) - base.SeqNum(count)
seqNum := logSeqNum
if seqNum == 0 {
// We can't use the value 0 for the global seqnum during ingestion, because
Expand Down Expand Up @@ -461,7 +462,7 @@ func (p *commitPipeline) prepare(b *Batch, syncWAL bool, noSyncWait bool) (*memT
// Assign the batch a sequence number. Note that we use atomic operations
// here to handle concurrent reads of logSeqNum. commitPipeline.mu provides
// mutual exclusion for other goroutines writing to logSeqNum.
b.setSeqNum(p.env.logSeqNum.Add(n) - n)
b.setSeqNum(p.env.logSeqNum.Add(base.SeqNum(n)) - base.SeqNum(n))

// Write the data to the WAL.
mem, err := p.env.write(b, syncWG, syncErr)
Expand Down Expand Up @@ -502,7 +503,7 @@ func (p *commitPipeline) publish(b *Batch) {
// that the sequence number ratchets up.
for {
curSeqNum := p.env.visibleSeqNum.Load()
newSeqNum := t.SeqNum() + uint64(t.Count())
newSeqNum := t.SeqNum() + base.SeqNum(t.Count())
if newSeqNum <= curSeqNum {
// t's sequence number has already been published.
break
Expand Down
30 changes: 15 additions & 15 deletions commit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ import (
)

type testCommitEnv struct {
logSeqNum atomic.Uint64
visibleSeqNum atomic.Uint64
logSeqNum base.AtomicSeqNum
visibleSeqNum base.AtomicSeqNum
writeCount atomic.Uint64
applyBuf struct {
sync.Mutex
Expand All @@ -45,7 +45,7 @@ func (e *testCommitEnv) env() commitEnv {

func (e *testCommitEnv) apply(b *Batch, mem *memTable) error {
e.applyBuf.Lock()
e.applyBuf.buf = append(e.applyBuf.buf, b.SeqNum())
e.applyBuf.buf = append(e.applyBuf.buf, uint64(b.SeqNum()))
e.applyBuf.Unlock()
return nil
}
Expand Down Expand Up @@ -115,10 +115,10 @@ func TestCommitPipeline(t *testing.T) {
t.Fatalf("expected %d written batches, but found %d",
n, len(e.applyBuf.buf))
}
if s := e.logSeqNum.Load(); uint64(n) != s {
if s := e.logSeqNum.Load(); base.SeqNum(n) != s {
t.Fatalf("expected %d, but found %d", n, s)
}
if s := e.visibleSeqNum.Load(); uint64(n) != s {
if s := e.visibleSeqNum.Load(); base.SeqNum(n) != s {
t.Fatalf("expected %d, but found %d", n, s)
}
}
Expand Down Expand Up @@ -160,10 +160,10 @@ func TestCommitPipelineSync(t *testing.T) {
t.Fatalf("expected %d written batches, but found %d",
n, len(e.applyBuf.buf))
}
if s := e.logSeqNum.Load(); uint64(n) != s {
if s := e.logSeqNum.Load(); base.SeqNum(n) != s {
t.Fatalf("expected %d, but found %d", n, s)
}
if s := e.visibleSeqNum.Load(); uint64(n) != s {
if s := e.visibleSeqNum.Load(); base.SeqNum(n) != s {
t.Fatalf("expected %d, but found %d", n, s)
}
})
Expand All @@ -182,9 +182,9 @@ func TestCommitPipelineAllocateSeqNum(t *testing.T) {
for i := 1; i <= n; i++ {
go func(i int) {
defer wg.Done()
p.AllocateSeqNum(i, func(_ uint64) {
p.AllocateSeqNum(i, func(_ base.SeqNum) {
prepareCount.Add(1)
}, func(seqNum uint64) {
}, func(_ base.SeqNum) {
applyCount.Add(1)
})
}(i)
Expand Down Expand Up @@ -238,8 +238,8 @@ func TestCommitPipelineWALClose(t *testing.T) {
var wal *record.LogWriter
var walDone sync.WaitGroup
testEnv := commitEnv{
logSeqNum: new(atomic.Uint64),
visibleSeqNum: new(atomic.Uint64),
logSeqNum: new(base.AtomicSeqNum),
visibleSeqNum: new(base.AtomicSeqNum),
apply: func(b *Batch, mem *memTable) error {
// At this point, we've called SyncRecord but the sync is blocked.
walDone.Done()
Expand Down Expand Up @@ -302,8 +302,8 @@ func TestCommitPipelineWALClose(t *testing.T) {
func TestCommitPipelineLogDataSeqNum(t *testing.T) {
var testEnv commitEnv
testEnv = commitEnv{
logSeqNum: new(atomic.Uint64),
visibleSeqNum: new(atomic.Uint64),
logSeqNum: new(base.AtomicSeqNum),
visibleSeqNum: new(base.AtomicSeqNum),
apply: func(b *Batch, mem *memTable) error {
// Jitter a delay in memtable application to get test coverage of
// varying interleavings of which batch completes memtable
Expand Down Expand Up @@ -363,8 +363,8 @@ func BenchmarkCommitPipeline(b *testing.B) {
mem := newMemTable(memTableOptions{})
var wal *record.LogWriter
nullCommitEnv := commitEnv{
logSeqNum: new(atomic.Uint64),
visibleSeqNum: new(atomic.Uint64),
logSeqNum: new(base.AtomicSeqNum),
visibleSeqNum: new(base.AtomicSeqNum),
apply: func(b *Batch, mem *memTable) error {
err := mem.apply(b, b.SeqNum())
if err != nil {
Expand Down
10 changes: 5 additions & 5 deletions compaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,8 +255,8 @@ type compaction struct {

// inputLargestSeqNumAbsolute returns the maximum LargestSeqNumAbsolute of any
// input sstables.
func (c *compaction) inputLargestSeqNumAbsolute() uint64 {
var seqNum uint64
func (c *compaction) inputLargestSeqNumAbsolute() base.SeqNum {
var seqNum base.SeqNum
for _, cl := range c.inputs {
cl.files.Each(func(m *manifest.FileMetadata) {
seqNum = max(seqNum, m.LargestSeqNumAbsolute)
Expand Down Expand Up @@ -1889,15 +1889,15 @@ type deleteCompactionHint struct {
// tombstone smallest sequence number to be deleted. All of a tables'
// sequence numbers must fall into the same snapshot stripe as the
// tombstone largest sequence number to be deleted.
tombstoneLargestSeqNum uint64
tombstoneSmallestSeqNum uint64
tombstoneLargestSeqNum base.SeqNum
tombstoneSmallestSeqNum base.SeqNum
// The smallest sequence number of a sstable that was found to be covered
// by this hint. The hint cannot be resolved until this sequence number is
// in the same snapshot stripe as the largest tombstone sequence number.
// This is set when a hint is created, so the LSM may look different and
// notably no longer contain the sstable that contained the key at this
// sequence number.
fileSmallestSeqNum uint64
fileSmallestSeqNum base.SeqNum
}

func (h deleteCompactionHint) String() string {
Expand Down
Loading

0 comments on commit ff83363

Please sign in to comment.