diff --git a/batch.go b/batch.go index c77896d25a..96c7b8c013 100644 --- a/batch.go +++ b/batch.go @@ -317,7 +317,7 @@ type batchInternal struct { // tombstonesSeqNum. This is the case for all new iterators created over a // batch but it's not the case for all cloned iterators. tombstones []keyspan.Span - tombstonesSeqNum uint64 + tombstonesSeqNum base.SeqNum // Fragmented range key spans. Cached the first time a range key iterator is // requested. The cache is invalidated whenever a new range key @@ -326,7 +326,7 @@ type batchInternal struct { // tombstonesSeqNum. This is the case for all new iterators created over a // batch but it's not the case for all cloned iterators. rangeKeys []keyspan.Span - rangeKeysSeqNum uint64 + rangeKeysSeqNum base.SeqNum // The flushableBatch wrapper if the batch is too large to fit in the // memtable. @@ -487,8 +487,8 @@ func newIndexedBatchWithSize(db *DB, comparer *Comparer, size int) *Batch { // with the base.InternalKeySeqNumBatch bit. These sequence numbers are only // used during iteration, and the keys are assigned ordinary sequence numbers // when the batch is committed. -func (b *Batch) nextSeqNum() uint64 { - return uint64(len(b.data)) | base.InternalKeySeqNumBatch +func (b *Batch) nextSeqNum() base.SeqNum { + return base.SeqNum(len(b.data)) | base.InternalKeySeqNumBatch } func (b *Batch) release() { @@ -1310,7 +1310,7 @@ func (b *Batch) initInternalIter(o *IterOptions, iter *batchIter) { } } -func (b *Batch) newRangeDelIter(o *IterOptions, batchSnapshot uint64) *keyspan.Iter { +func (b *Batch) newRangeDelIter(o *IterOptions, batchSnapshot base.SeqNum) *keyspan.Iter { // Construct an iterator even if rangeDelIndex is nil, because it is allowed // to refresh later, so we need the container to exist. iter := new(keyspan.Iter) @@ -1318,7 +1318,7 @@ func (b *Batch) newRangeDelIter(o *IterOptions, batchSnapshot uint64) *keyspan.I return iter } -func (b *Batch) initRangeDelIter(_ *IterOptions, iter *keyspan.Iter, batchSnapshot uint64) { +func (b *Batch) initRangeDelIter(_ *IterOptions, iter *keyspan.Iter, batchSnapshot base.SeqNum) { if b.rangeDelIndex == nil { iter.Init(b.cmp, nil) return @@ -1386,7 +1386,7 @@ func fragmentRangeDels(frag *keyspan.Fragmenter, it internalIterator, count int) frag.Finish() } -func (b *Batch) newRangeKeyIter(o *IterOptions, batchSnapshot uint64) *keyspan.Iter { +func (b *Batch) newRangeKeyIter(o *IterOptions, batchSnapshot base.SeqNum) *keyspan.Iter { // Construct an iterator even if rangeKeyIndex is nil, because it is allowed // to refresh later, so we need the container to exist. iter := new(keyspan.Iter) @@ -1394,7 +1394,7 @@ func (b *Batch) newRangeKeyIter(o *IterOptions, batchSnapshot uint64) *keyspan.I return iter } -func (b *Batch) initRangeKeyIter(_ *IterOptions, iter *keyspan.Iter, batchSnapshot uint64) { +func (b *Batch) initRangeKeyIter(_ *IterOptions, iter *keyspan.Iter, batchSnapshot base.SeqNum) { if b.rangeKeyIndex == nil { iter.Init(b.cmp, nil) return @@ -1596,14 +1596,14 @@ func (b *Batch) grow(n int) { b.data = b.data[:newSize] } -func (b *Batch) setSeqNum(seqNum uint64) { +func (b *Batch) setSeqNum(seqNum base.SeqNum) { batchrepr.SetSeqNum(b.data, seqNum) } // SeqNum returns the batch sequence number which is applied to the first // record in the batch. The sequence number is incremented for each subsequent // record. It returns zero if the batch is empty. -func (b *Batch) SeqNum() uint64 { +func (b *Batch) SeqNum() base.SeqNum { if len(b.data) == 0 { b.init(batchrepr.HeaderLen) } @@ -1666,7 +1666,7 @@ type batchIter struct { // read. This sequence number has the InternalKeySeqNumBatch bit set, so it // encodes an offset within the batch. Only batch entries earlier than the // offset are visible during iteration. - snapshot uint64 + snapshot base.SeqNum } // batchIter implements the base.InternalIterator interface. @@ -1849,7 +1849,7 @@ type flushableBatch struct { // The base sequence number for the entries in the batch. This is the same // value as Batch.seqNum() and is cached here for performance. - seqNum uint64 + seqNum base.SeqNum // A slice of offsets and indices for the entries in the batch. Used to // implement flushableBatchIter. Unlike the indexing on a normal batch, a @@ -1997,7 +1997,7 @@ func newFlushableBatch(batch *Batch, comparer *Comparer) (*flushableBatch, error return b, nil } -func (b *flushableBatch) setSeqNum(seqNum uint64) { +func (b *flushableBatch) setSeqNum(seqNum base.SeqNum) { if b.seqNum != 0 { panic(fmt.Sprintf("pebble: flushableBatch.seqNum already set: %d", b.seqNum)) } @@ -2266,7 +2266,7 @@ func (i *flushableBatchIter) getKey(index int) InternalKey { e := &i.offsets[index] kind := InternalKeyKind(i.data[e.offset]) key := i.data[e.keyStart:e.keyEnd] - return base.MakeInternalKey(key, i.batch.seqNum+uint64(e.index), kind) + return base.MakeInternalKey(key, i.batch.seqNum+base.SeqNum(e.index), kind) } func (i *flushableBatchIter) getKV(index int) *base.InternalKV { diff --git a/batch_test.go b/batch_test.go index e9e78d79ae..946257e435 100644 --- a/batch_test.go +++ b/batch_test.go @@ -12,7 +12,6 @@ import ( "io" "math" "math/rand" - "strconv" "strings" "sync" "testing" @@ -292,7 +291,7 @@ func testBatchEmpty(t *testing.T, size int, opts ...BatchOption) { require.True(t, b.Empty()) b = newBatchWithSize(nil, size) - require.Equal(t, uint64(0), b.SeqNum()) + require.Equal(t, base.SeqNumZero, b.SeqNum()) require.True(t, b.Empty()) b.Reset() require.True(t, b.Empty()) @@ -396,7 +395,7 @@ func TestBatchReset(t *testing.T) { require.Equal(t, uint64(0), b.countRangeDels) require.Equal(t, uint64(0), b.countRangeKeys) require.Equal(t, batchrepr.HeaderLen, len(b.data)) - require.Equal(t, uint64(0), b.SeqNum()) + require.Equal(t, base.SeqNumZero, b.SeqNum()) require.Equal(t, uint64(0), b.memTableSize) require.Equal(t, FormatMajorVersion(0x00), b.minimumFormatMajorVersion) require.Equal(t, b.deferredOp, DeferredBatchOp{}) @@ -1188,11 +1187,8 @@ func TestFlushableBatch(t *testing.T) { if len(d.CmdArgs) != 1 || len(d.CmdArgs[0].Vals) != 1 || d.CmdArgs[0].Key != "seq" { return "dump seq=\n" } - seqNum, err := strconv.Atoi(d.CmdArgs[0].Vals[0]) - if err != nil { - return err.Error() - } - b.setSeqNum(uint64(seqNum)) + seqNum := base.ParseSeqNum(d.CmdArgs[0].Vals[0]) + b.setSeqNum(seqNum) var buf bytes.Buffer diff --git a/batchrepr/reader.go b/batchrepr/reader.go index ebb14d7819..652e94b8b5 100644 --- a/batchrepr/reader.go +++ b/batchrepr/reader.go @@ -48,7 +48,7 @@ func ReadHeader(repr []byte) (h Header, ok bool) { type Header struct { // SeqNum is the sequence number at which the batch is committed. A batch // that has not yet committed will have a zero sequence number. - SeqNum uint64 + SeqNum base.SeqNum // Count is the count of keys written to the batch. Count uint32 } @@ -62,8 +62,8 @@ func (h Header) String() string { // does not validate that the repr is valid. It's exported only for very // performance sensitive code paths that should not necessarily read the rest of // the header as well. -func ReadSeqNum(repr []byte) uint64 { - return binary.LittleEndian.Uint64(repr[:countOffset]) +func ReadSeqNum(repr []byte) base.SeqNum { + return base.SeqNum(binary.LittleEndian.Uint64(repr[:countOffset])) } // Read constructs a Reader from an encoded batch representation, ignoring the diff --git a/batchrepr/writer.go b/batchrepr/writer.go index 3069856ea3..cd68ef1c39 100644 --- a/batchrepr/writer.go +++ b/batchrepr/writer.go @@ -4,13 +4,17 @@ package batchrepr -import "encoding/binary" +import ( + "encoding/binary" + + "github.com/cockroachdb/pebble/internal/base" +) // SetSeqNum mutates the provided batch representation, storing the provided // sequence number in its header. The provided byte slice must already be at // least HeaderLen bytes long or else SetSeqNum will panic. -func SetSeqNum(repr []byte, seqNum uint64) { - binary.LittleEndian.PutUint64(repr[:countOffset], seqNum) +func SetSeqNum(repr []byte, seqNum base.SeqNum) { + binary.LittleEndian.PutUint64(repr[:countOffset], uint64(seqNum)) } // SetCount mutates the provided batch representation, storing the provided diff --git a/batchrepr/writer_test.go b/batchrepr/writer_test.go index 133e8eaf3f..1f7585c542 100644 --- a/batchrepr/writer_test.go +++ b/batchrepr/writer_test.go @@ -10,6 +10,7 @@ import ( "testing" "github.com/cockroachdb/datadriven" + "github.com/cockroachdb/pebble/internal/base" "github.com/cockroachdb/pebble/internal/binfmt" ) @@ -37,10 +38,7 @@ func TestWriter(t *testing.T) { return prettyBinaryRepr(repr) case "set-seqnum": - seqNum, err := strconv.ParseUint(td.CmdArgs[0].Key, 10, 64) - if err != nil { - return err.Error() - } + seqNum := base.ParseSeqNum(td.CmdArgs[0].Key) SetSeqNum(repr, seqNum) return prettyBinaryRepr(repr) diff --git a/commit.go b/commit.go index 3eb61d8bd3..a75ba0c31b 100644 --- a/commit.go +++ b/commit.go @@ -11,6 +11,7 @@ import ( "time" "github.com/cockroachdb/pebble/batchrepr" + "github.com/cockroachdb/pebble/internal/base" "github.com/cockroachdb/pebble/record" ) @@ -126,10 +127,10 @@ func (q *commitQueue) dequeueApplied() *Batch { type commitEnv struct { // The next sequence number to give to a batch. Protected by // commitPipeline.mu. - logSeqNum *atomic.Uint64 + logSeqNum *base.AtomicSeqNum // The visible sequence number at which reads should be performed. Ratcheted // upwards atomically as batches are applied to the memtable. - visibleSeqNum *atomic.Uint64 + visibleSeqNum *base.AtomicSeqNum // Apply the batch to the specified memtable. Called concurrently. apply func(b *Batch, mem *memTable) error @@ -360,7 +361,7 @@ func (p *commitPipeline) Commit(b *Batch, syncWAL bool, noSyncWait bool) error { // invoked with commitPipeline.mu held, but note that DB.mu is not held and // must be locked if necessary. func (p *commitPipeline) AllocateSeqNum( - count int, prepare func(seqNum uint64), apply func(seqNum uint64), + count int, prepare func(seqNum base.SeqNum), apply func(seqNum base.SeqNum), ) { // This method is similar to Commit and prepare. Be careful about trying to // share additional code with those methods because Commit and prepare are @@ -387,7 +388,7 @@ func (p *commitPipeline) AllocateSeqNum( // Assign the batch a sequence number. Note that we use atomic operations // here to handle concurrent reads of logSeqNum. commitPipeline.mu provides // mutual exclusion for other goroutines writing to logSeqNum. - logSeqNum := p.env.logSeqNum.Add(uint64(count)) - uint64(count) + logSeqNum := p.env.logSeqNum.Add(base.SeqNum(count)) - base.SeqNum(count) seqNum := logSeqNum if seqNum == 0 { // We can't use the value 0 for the global seqnum during ingestion, because @@ -461,7 +462,7 @@ func (p *commitPipeline) prepare(b *Batch, syncWAL bool, noSyncWait bool) (*memT // Assign the batch a sequence number. Note that we use atomic operations // here to handle concurrent reads of logSeqNum. commitPipeline.mu provides // mutual exclusion for other goroutines writing to logSeqNum. - b.setSeqNum(p.env.logSeqNum.Add(n) - n) + b.setSeqNum(p.env.logSeqNum.Add(base.SeqNum(n)) - base.SeqNum(n)) // Write the data to the WAL. mem, err := p.env.write(b, syncWG, syncErr) @@ -502,7 +503,7 @@ func (p *commitPipeline) publish(b *Batch) { // that the sequence number ratchets up. for { curSeqNum := p.env.visibleSeqNum.Load() - newSeqNum := t.SeqNum() + uint64(t.Count()) + newSeqNum := t.SeqNum() + base.SeqNum(t.Count()) if newSeqNum <= curSeqNum { // t's sequence number has already been published. break diff --git a/commit_test.go b/commit_test.go index 30d3d7b89a..bf9be21b4d 100644 --- a/commit_test.go +++ b/commit_test.go @@ -24,8 +24,8 @@ import ( ) type testCommitEnv struct { - logSeqNum atomic.Uint64 - visibleSeqNum atomic.Uint64 + logSeqNum base.AtomicSeqNum + visibleSeqNum base.AtomicSeqNum writeCount atomic.Uint64 applyBuf struct { sync.Mutex @@ -45,7 +45,7 @@ func (e *testCommitEnv) env() commitEnv { func (e *testCommitEnv) apply(b *Batch, mem *memTable) error { e.applyBuf.Lock() - e.applyBuf.buf = append(e.applyBuf.buf, b.SeqNum()) + e.applyBuf.buf = append(e.applyBuf.buf, uint64(b.SeqNum())) e.applyBuf.Unlock() return nil } @@ -115,10 +115,10 @@ func TestCommitPipeline(t *testing.T) { t.Fatalf("expected %d written batches, but found %d", n, len(e.applyBuf.buf)) } - if s := e.logSeqNum.Load(); uint64(n) != s { + if s := e.logSeqNum.Load(); base.SeqNum(n) != s { t.Fatalf("expected %d, but found %d", n, s) } - if s := e.visibleSeqNum.Load(); uint64(n) != s { + if s := e.visibleSeqNum.Load(); base.SeqNum(n) != s { t.Fatalf("expected %d, but found %d", n, s) } } @@ -160,10 +160,10 @@ func TestCommitPipelineSync(t *testing.T) { t.Fatalf("expected %d written batches, but found %d", n, len(e.applyBuf.buf)) } - if s := e.logSeqNum.Load(); uint64(n) != s { + if s := e.logSeqNum.Load(); base.SeqNum(n) != s { t.Fatalf("expected %d, but found %d", n, s) } - if s := e.visibleSeqNum.Load(); uint64(n) != s { + if s := e.visibleSeqNum.Load(); base.SeqNum(n) != s { t.Fatalf("expected %d, but found %d", n, s) } }) @@ -182,9 +182,9 @@ func TestCommitPipelineAllocateSeqNum(t *testing.T) { for i := 1; i <= n; i++ { go func(i int) { defer wg.Done() - p.AllocateSeqNum(i, func(_ uint64) { + p.AllocateSeqNum(i, func(_ base.SeqNum) { prepareCount.Add(1) - }, func(seqNum uint64) { + }, func(_ base.SeqNum) { applyCount.Add(1) }) }(i) @@ -238,8 +238,8 @@ func TestCommitPipelineWALClose(t *testing.T) { var wal *record.LogWriter var walDone sync.WaitGroup testEnv := commitEnv{ - logSeqNum: new(atomic.Uint64), - visibleSeqNum: new(atomic.Uint64), + logSeqNum: new(base.AtomicSeqNum), + visibleSeqNum: new(base.AtomicSeqNum), apply: func(b *Batch, mem *memTable) error { // At this point, we've called SyncRecord but the sync is blocked. walDone.Done() @@ -302,8 +302,8 @@ func TestCommitPipelineWALClose(t *testing.T) { func TestCommitPipelineLogDataSeqNum(t *testing.T) { var testEnv commitEnv testEnv = commitEnv{ - logSeqNum: new(atomic.Uint64), - visibleSeqNum: new(atomic.Uint64), + logSeqNum: new(base.AtomicSeqNum), + visibleSeqNum: new(base.AtomicSeqNum), apply: func(b *Batch, mem *memTable) error { // Jitter a delay in memtable application to get test coverage of // varying interleavings of which batch completes memtable @@ -363,8 +363,8 @@ func BenchmarkCommitPipeline(b *testing.B) { mem := newMemTable(memTableOptions{}) var wal *record.LogWriter nullCommitEnv := commitEnv{ - logSeqNum: new(atomic.Uint64), - visibleSeqNum: new(atomic.Uint64), + logSeqNum: new(base.AtomicSeqNum), + visibleSeqNum: new(base.AtomicSeqNum), apply: func(b *Batch, mem *memTable) error { err := mem.apply(b, b.SeqNum()) if err != nil { diff --git a/compaction.go b/compaction.go index 536fb0f531..d5ebd2cea3 100644 --- a/compaction.go +++ b/compaction.go @@ -255,8 +255,8 @@ type compaction struct { // inputLargestSeqNumAbsolute returns the maximum LargestSeqNumAbsolute of any // input sstables. -func (c *compaction) inputLargestSeqNumAbsolute() uint64 { - var seqNum uint64 +func (c *compaction) inputLargestSeqNumAbsolute() base.SeqNum { + var seqNum base.SeqNum for _, cl := range c.inputs { cl.files.Each(func(m *manifest.FileMetadata) { seqNum = max(seqNum, m.LargestSeqNumAbsolute) @@ -1889,15 +1889,15 @@ type deleteCompactionHint struct { // tombstone smallest sequence number to be deleted. All of a tables' // sequence numbers must fall into the same snapshot stripe as the // tombstone largest sequence number to be deleted. - tombstoneLargestSeqNum uint64 - tombstoneSmallestSeqNum uint64 + tombstoneLargestSeqNum base.SeqNum + tombstoneSmallestSeqNum base.SeqNum // The smallest sequence number of a sstable that was found to be covered // by this hint. The hint cannot be resolved until this sequence number is // in the same snapshot stripe as the largest tombstone sequence number. // This is set when a hint is created, so the LSM may look different and // notably no longer contain the sstable that contained the key at this // sequence number. - fileSmallestSeqNum uint64 + fileSmallestSeqNum base.SeqNum } func (h deleteCompactionHint) String() string { diff --git a/compaction_picker.go b/compaction_picker.go index 1fc71c7780..4724a4b4c8 100644 --- a/compaction_picker.go +++ b/compaction_picker.go @@ -33,8 +33,8 @@ type compactionEnv struct { // flushes are longer, slower operations and provide a much looser bound // when available bytes is decreasing. diskAvailBytes uint64 - earliestUnflushedSeqNum uint64 - earliestSnapshotSeqNum uint64 + earliestUnflushedSeqNum base.SeqNum + earliestSnapshotSeqNum base.SeqNum inProgressCompactions []compactionInfo readCompactionEnv readCompactionEnv } @@ -1041,7 +1041,7 @@ func pickCompactionSeedFile( virtualBackings *manifest.VirtualBackings, opts *Options, level, outputLevel int, - earliestSnapshotSeqNum uint64, + earliestSnapshotSeqNum base.SeqNum, ) (manifest.LevelFile, bool) { // Select the file within the level to compact. We want to minimize write // amplification, but also ensure that (a) deletes are propagated to the diff --git a/compaction_picker_test.go b/compaction_picker_test.go index ba636f0f8b..2635321136 100644 --- a/compaction_picker_test.go +++ b/compaction_picker_test.go @@ -80,9 +80,9 @@ func loadVersion(t *testing.T, d *datadriven.TestData) (*version, *Options, stri var key InternalKey if level == 0 { // For L0, make `size` overlapping files. - key = base.MakeInternalKey([]byte(fmt.Sprintf("%04d", 1)), i, InternalKeyKindSet) + key = base.MakeInternalKey([]byte(fmt.Sprintf("%04d", 1)), base.SeqNum(i), InternalKeyKindSet) } else { - key = base.MakeInternalKey([]byte(fmt.Sprintf("%04d", i)), i, InternalKeyKindSet) + key = base.MakeInternalKey([]byte(fmt.Sprintf("%04d", i)), base.SeqNum(i), InternalKeyKindSet) } m := (&fileMetadata{ FileNum: base.FileNum(uint64(level)*100_000 + i), @@ -107,7 +107,7 @@ func loadVersion(t *testing.T, d *datadriven.TestData) (*version, *Options, stri // TestCompactionPickerTargetLevel. Clean this up somehow. m.Size = size if level != 0 { - endKey := base.MakeInternalKey([]byte(fmt.Sprintf("%04d", size)), i, InternalKeyKindSet) + endKey := base.MakeInternalKey([]byte(fmt.Sprintf("%04d", size)), base.SeqNum(i), InternalKeyKindSet) m.ExtendPointKeyBounds(opts.Comparer.Compare, key, endKey) } } diff --git a/compaction_test.go b/compaction_test.go index 05adc218e6..5b21d4f4d1 100644 --- a/compaction_test.go +++ b/compaction_test.go @@ -1423,11 +1423,11 @@ func TestCompactionDeleteOnlyHints(t *testing.T) { hintType: hintType, start: start, end: end, - fileSmallestSeqNum: parseUint64(parts[4]), + fileSmallestSeqNum: base.SeqNum(parseUint64(parts[4])), tombstoneLevel: tombstoneLevel, tombstoneFile: tombstoneFile, - tombstoneSmallestSeqNum: parseUint64(parts[5]), - tombstoneLargestSeqNum: parseUint64(parts[6]), + tombstoneSmallestSeqNum: base.SeqNum(parseUint64(parts[5])), + tombstoneLargestSeqNum: base.SeqNum(parseUint64(parts[6])), } d.mu.compact.deletionHints = append(d.mu.compact.deletionHints, h) fmt.Fprintln(&buf, h.String()) @@ -1497,10 +1497,7 @@ func TestCompactionDeleteOnlyHints(t *testing.T) { return s case "close-snapshot": - seqNum, err := strconv.ParseUint(strings.TrimSpace(td.Input), 0, 64) - if err != nil { - return err.Error() - } + seqNum := base.ParseSeqNum(strings.TrimSpace(td.Input)) d.mu.Lock() var s *Snapshot l := &d.mu.snapshots @@ -1633,10 +1630,7 @@ func TestCompactionTombstones(t *testing.T) { return runTableStatsCmd(td, d) case "close-snapshot": - seqNum, err := strconv.ParseUint(strings.TrimSpace(td.Input), 0, 64) - if err != nil { - return err.Error() - } + seqNum := base.ParseSeqNum(strings.TrimSpace(td.Input)) d.mu.Lock() var s *Snapshot l := &d.mu.snapshots diff --git a/data_test.go b/data_test.go index 5ea1ba6600..1118b006ce 100644 --- a/data_test.go +++ b/data_test.go @@ -41,7 +41,11 @@ func runGetCmd(t testing.TB, td *datadriven.TestData, d *DB) string { db: d, seqNum: InternalKeySeqNumMax, } - td.MaybeScanArgs(t, "seq", &snap.seqNum) + if td.HasArg("seq") { + var n uint64 + td.ScanArgs(t, "seq", &n) + snap.seqNum = base.SeqNum(n) + } var buf bytes.Buffer for _, data := range strings.Split(td.Input, "\n") { @@ -767,7 +771,7 @@ func runDBDefineCmd(td *datadriven.TestData, opts *Options) (*DB, error) { func runDBDefineCmdReuseFS(td *datadriven.TestData, opts *Options) (*DB, error) { opts = opts.EnsureDefaults() - var snapshots []uint64 + var snapshots []base.SeqNum var levelMaxBytes map[int]int64 for _, arg := range td.CmdArgs { switch arg.Key { @@ -781,13 +785,9 @@ func runDBDefineCmdReuseFS(td *datadriven.TestData, opts *Options) (*DB, error) opts.Levels[i].TargetFileSize = size } case "snapshots": - snapshots = make([]uint64, len(arg.Vals)) + snapshots = make([]base.SeqNum, len(arg.Vals)) for i := range arg.Vals { - seqNum, err := strconv.ParseUint(arg.Vals[i], 10, 64) - if err != nil { - return nil, err - } - snapshots[i] = seqNum + snapshots[i] = base.ParseSeqNum(arg.Vals[i]) if i > 0 && snapshots[i] < snapshots[i-1] { return nil, errors.New("Snapshots must be in ascending order") } diff --git a/db.go b/db.go index 7a8cf541e9..034e77bb63 100644 --- a/db.go +++ b/db.go @@ -554,7 +554,7 @@ func (d *DB) getInternal(key []byte, b *Batch, s *Snapshot) ([]byte, io.Closer, // Determine the seqnum to read at after grabbing the read state (current and // memtables) above. - var seqNum uint64 + var seqNum base.SeqNum if s != nil { seqNum = s.seqNum } else { @@ -1013,7 +1013,7 @@ var iterAllocPool = sync.Pool{ // - EFOS that has been excised but is in alwaysCreateIters mode (tests only). // Only `seqNum` and `readState` are set. type snapshotIterOpts struct { - seqNum uint64 + seqNum base.SeqNum vers *version readState *readState } @@ -1255,7 +1255,7 @@ func (d *DB) ScanInternal( categoryAndQoS sstable.CategoryAndQoS, lower, upper []byte, visitPointKey func(key *InternalKey, value LazyValue, iterInfo IteratorLevel) error, - visitRangeDel func(start, end []byte, seqNum uint64) error, + visitRangeDel func(start, end []byte, seqNum SeqNum) error, visitRangeKey func(start, end []byte, keys []rangekey.Key) error, visitSharedFile func(sst *SharedSSTMeta) error, visitExternalFile func(sst *ExternalFile) error, @@ -2343,7 +2343,7 @@ func (d *DB) walPreallocateSize() int { } func (d *DB) newMemTable( - logNum base.DiskFileNum, logSeqNum, minSize uint64, + logNum base.DiskFileNum, logSeqNum base.SeqNum, minSize uint64, ) (*memTable, *flushableEntry) { targetSize := minSize + uint64(memTableEmptySize) // The targetSize should be less than MemTableSize, because any batch >= @@ -2425,7 +2425,7 @@ func (d *DB) freeMemTable(m *memTable) { } func (d *DB) newFlushableEntry( - f flushable, logNum base.DiskFileNum, logSeqNum uint64, + f flushable, logNum base.DiskFileNum, logSeqNum base.SeqNum, ) *flushableEntry { fe := &flushableEntry{ flushable: f, @@ -2546,12 +2546,12 @@ func (d *DB) makeRoomForWrite(b *Batch) error { imm := d.mu.mem.queue[len(d.mu.mem.queue)-1] imm.logSize = prevLogSize - var logSeqNum uint64 + var logSeqNum base.SeqNum var minSize uint64 if b != nil { logSeqNum = b.SeqNum() if b.flushable != nil { - logSeqNum += uint64(b.Count()) + logSeqNum += base.SeqNum(b.Count()) // The batch is too large to fit in the memtable so add it directly to // the immutable queue. The flushable batch is associated with the same // log as the immutable memtable, but logically occurs after it in @@ -2575,7 +2575,7 @@ func (d *DB) makeRoomForWrite(b *Batch) error { // b == nil // // This is a manual forced flush. - logSeqNum = d.mu.versions.logSeqNum.Load() + logSeqNum = base.SeqNum(d.mu.versions.logSeqNum.Load()) imm.flushForced = true // If we are manually flushing and we used less than half of the bytes in // the memtable, don't increase the size for the next memtable. This @@ -2603,7 +2603,7 @@ func (d *DB) makeRoomForWrite(b *Batch) error { // Both DB.mu and commitPipeline.mu must be held by the caller. func (d *DB) rotateMemtable( - newLogNum base.DiskFileNum, logSeqNum uint64, prev *memTable, minSize uint64, + newLogNum base.DiskFileNum, logSeqNum base.SeqNum, prev *memTable, minSize uint64, ) { // Create a new memtable, scheduling the previous one for flushing. We do // this even if the previous memtable was empty because the DB.Flush @@ -2683,7 +2683,7 @@ func (d *DB) rotateWAL() (newLogNum base.DiskFileNum, prevLogSize uint64) { return newLogNum, prevLogSize } -func (d *DB) getEarliestUnflushedSeqNumLocked() uint64 { +func (d *DB) getEarliestUnflushedSeqNumLocked() base.SeqNum { seqNum := InternalKeySeqNumMax for i := range d.mu.mem.queue { logSeqNum := d.mu.mem.queue[i].logSeqNum @@ -2864,7 +2864,7 @@ func (d *DB) ScanStatistics( stats.BytesRead += uint64(key.Size() + value.Len()) return nil }, - visitRangeDel: func(start, end []byte, seqNum uint64) error { + visitRangeDel: func(start, end []byte, seqNum base.SeqNum) error { stats.Accumulated.KindsCount[InternalKeyKindRangeDelete]++ stats.BytesRead += uint64(len(start) + len(end)) return nil diff --git a/db_test.go b/db_test.go index 79dd337ac8..4864b65261 100644 --- a/db_test.go +++ b/db_test.go @@ -384,7 +384,7 @@ func TestLargeBatch(t *testing.T) { require.NoError(t, err) return logs[len(logs)-1] } - memTableCreationSeqNum := func() uint64 { + memTableCreationSeqNum := func() base.SeqNum { d.mu.Lock() defer d.mu.Unlock() return d.mu.mem.mutable.logSeqNum diff --git a/download.go b/download.go index 4aad4aee89..3c4a331a4e 100644 --- a/download.go +++ b/download.go @@ -292,7 +292,7 @@ type downloadCursor struct { // Inclusive lower bound for sequence number for tables on level with // Smallest.UserKey equaling key. Used to break ties within L0, and also used // to position a cursor immediately after a given file. - seqNum uint64 + seqNum base.SeqNum } var endCursor = downloadCursor{level: manifest.NumLevels} diff --git a/event.go b/event.go index 194709fc23..b859d7a914 100644 --- a/event.go +++ b/event.go @@ -384,7 +384,7 @@ type TableIngestInfo struct { } // GlobalSeqNum is the sequence number that was assigned to all entries in // the ingested table. - GlobalSeqNum uint64 + GlobalSeqNum base.SeqNum // flushable indicates whether the ingested sstable was treated as a // flushable. flushable bool diff --git a/flushable.go b/flushable.go index 241ddcfcc8..bc2915c2f2 100644 --- a/flushable.go +++ b/flushable.go @@ -85,7 +85,7 @@ type flushableEntry struct { logSize uint64 // The current logSeqNum at the time the memtable was created. This is // guaranteed to be less than or equal to any seqnum stored in the memtable. - logSeqNum uint64 + logSeqNum base.SeqNum // readerRefs tracks the read references on the flushable. The two sources of // reader references are DB.mu.mem.queue and readState.memtables. The memory // reserved by the flushable in the cache is released when the reader refs diff --git a/get_iter.go b/get_iter.go index 44a4c890e2..fc50a9f73f 100644 --- a/get_iter.go +++ b/get_iter.go @@ -20,7 +20,7 @@ import ( type getIter struct { comparer *Comparer newIters tableNewIters - snapshot uint64 + snapshot base.SeqNum iterOpts IterOptions key []byte prefix []byte @@ -36,7 +36,7 @@ type getIter struct { // deletion encounterd transitions tombstoned to true. The tombstonedSeqNum // field is updated to hold the sequence number of the tombstone. tombstoned bool - tombstonedSeqNum uint64 + tombstonedSeqNum base.SeqNum err error } diff --git a/ingest.go b/ingest.go index 7f4c9f3151..0c380ce2d6 100644 --- a/ingest.go +++ b/ingest.go @@ -756,7 +756,9 @@ func (d *DB) ingestUnprotectExternalBackings(lr ingestLoadResult) { } } -func setSeqNumInMetadata(m *fileMetadata, seqNum uint64, cmp Compare, format base.FormatKey) error { +func setSeqNumInMetadata( + m *fileMetadata, seqNum base.SeqNum, cmp Compare, format base.FormatKey, +) error { setSeqFn := func(k base.InternalKey) base.InternalKey { return base.MakeInternalKey(k.UserKey, seqNum, k.Kind()) } @@ -795,7 +797,7 @@ func setSeqNumInMetadata(m *fileMetadata, seqNum uint64, cmp Compare, format bas } func ingestUpdateSeqNum( - cmp Compare, format base.FormatKey, seqNum uint64, loadResult ingestLoadResult, + cmp Compare, format base.FormatKey, seqNum base.SeqNum, loadResult ingestLoadResult, ) error { // Shared sstables are required to be sorted by level ascending. We then // iterate the shared sstables in reverse, assigning the lower sequence @@ -1181,7 +1183,7 @@ func (d *DB) IngestAndExcise( // Both DB.mu and commitPipeline.mu must be held while this is called. func (d *DB) newIngestedFlushableEntry( - meta []*fileMetadata, seqNum uint64, logNum base.DiskFileNum, exciseSpan KeyRange, + meta []*fileMetadata, seqNum base.SeqNum, logNum base.DiskFileNum, exciseSpan KeyRange, ) (*flushableEntry, error) { // Update the sequence number for all of the sstables in the // metadata. Writing the metadata to the manifest when the @@ -1192,7 +1194,7 @@ func (d *DB) newIngestedFlushableEntry( // time, then we'll lose the ingest sequence number information. But this // information will also be reconstructed on node restart. for i, m := range meta { - if err := setSeqNumInMetadata(m, seqNum+uint64(i), d.cmp, d.opts.Comparer.FormatKey); err != nil { + if err := setSeqNumInMetadata(m, seqNum+base.SeqNum(i), d.cmp, d.opts.Comparer.FormatKey); err != nil { return nil, err } } @@ -1228,7 +1230,7 @@ func (d *DB) newIngestedFlushableEntry( // recycle the WAL in this function is irrelevant as long as the correct log // numbers are assigned to the appropriate flushable. func (d *DB) handleIngestAsFlushable( - meta []*fileMetadata, seqNum uint64, exciseSpan KeyRange, + meta []*fileMetadata, seqNum base.SeqNum, exciseSpan KeyRange, ) error { b := d.NewBatch() for _, m := range meta { @@ -1264,7 +1266,7 @@ func (d *DB) handleIngestAsFlushable( if err != nil { return err } - nextSeqNum := seqNum + uint64(b.Count()) + nextSeqNum := seqNum + base.SeqNum(b.Count()) // Set newLogNum to the logNum of the previous flushable. This value is // irrelevant if the WAL is disabled. If the WAL is enabled, then we set @@ -1382,7 +1384,7 @@ func (d *DB) ingest( var mut *memTable // asFlushable indicates whether the sstable was ingested as a flushable. var asFlushable bool - prepare := func(seqNum uint64) { + prepare := func(seqNum base.SeqNum) { // Note that d.commit.mu is held by commitPipeline when calling prepare. // Determine the set of bounds we care about for the purpose of checking @@ -1550,7 +1552,7 @@ func (d *DB) ingest( } var ve *versionEdit - apply := func(seqNum uint64) { + apply := func(seqNum base.SeqNum) { if err != nil || asFlushable { // An error occurred during prepare. if mut != nil { @@ -2083,7 +2085,7 @@ func (d *DB) ingestApply( lr ingestLoadResult, mut *memTable, exciseSpan KeyRange, - exciseSeqNum uint64, + exciseSeqNum base.SeqNum, ) (*versionEdit, error) { d.mu.Lock() defer d.mu.Unlock() diff --git a/ingest_test.go b/ingest_test.go index e5ce6f99b7..6549d4b1ba 100644 --- a/ingest_test.go +++ b/ingest_test.go @@ -1072,7 +1072,7 @@ func testIngestSharedImpl( require.NoError(t, w.Add(base.MakeInternalKey(key.UserKey, 0, key.Kind()), val)) return nil }, - func(start, end []byte, seqNum uint64) error { + func(start, end []byte, seqNum base.SeqNum) error { require.NoError(t, w.DeleteRange(start, end)) return nil }, @@ -1570,7 +1570,7 @@ func TestConcurrentExcise(t *testing.T) { require.NoError(t, w.Add(base.MakeInternalKey(key.UserKey, 0, key.Kind()), val)) return nil }, - func(start, end []byte, seqNum uint64) error { + func(start, end []byte, seqNum base.SeqNum) error { require.NoError(t, w.DeleteRange(start, end)) return nil }, @@ -2005,7 +2005,7 @@ func TestIngestExternal(t *testing.T) { require.NoError(t, w.Add(base.MakeInternalKey(key.UserKey, 0, key.Kind()), val)) return nil }, - func(start, end []byte, seqNum uint64) error { + func(start, end []byte, seqNum base.SeqNum) error { require.NoError(t, w.DeleteRange(start, end)) return nil }, @@ -3220,17 +3220,13 @@ func TestIngest_UpdateSequenceNumber(t *testing.T) { } var ( - seqNum uint64 - err error + seqNum base.SeqNum metas []*fileMetadata ) datadriven.RunTest(t, "testdata/ingest_update_seqnums", func(t *testing.T, td *datadriven.TestData) string { switch td.Cmd { case "starting-seqnum": - seqNum, err = strconv.ParseUint(td.Input, 10, 64) - if err != nil { - return err.Error() - } + seqNum = base.ParseSeqNum(td.Input) return "" case "reset": @@ -3301,7 +3297,7 @@ func TestIngest_UpdateSequenceNumber(t *testing.T) { case "update-files": // Update the bounds across all files. for i, m := range metas { - if err := setSeqNumInMetadata(m, seqNum+uint64(i), cmp, base.DefaultFormatter); err != nil { + if err := setSeqNumInMetadata(m, seqNum+base.SeqNum(i), cmp, base.DefaultFormatter); err != nil { return err.Error() } } diff --git a/internal.go b/internal.go index 27635bedd6..90718d02ff 100644 --- a/internal.go +++ b/internal.go @@ -35,6 +35,9 @@ const ( // InternalKey exports the base.InternalKey type. type InternalKey = base.InternalKey +// SeqNum exports the base.SeqNum type. +type SeqNum = base.SeqNum + type internalIterator = base.InternalIterator type topLevelIterator = base.TopLevelIterator diff --git a/internal/arenaskl/node.go b/internal/arenaskl/node.go index d464bc56b4..43307de976 100644 --- a/internal/arenaskl/node.go +++ b/internal/arenaskl/node.go @@ -47,7 +47,7 @@ type node struct { // Immutable fields, so no need to lock to access key. keyOffset uint32 keySize uint32 - keyTrailer uint64 + keyTrailer base.Trailer valueSize uint32 allocSize uint32 diff --git a/internal/base/internal.go b/internal/base/internal.go index 6ee4303a68..40188def4c 100644 --- a/internal/base/internal.go +++ b/internal/base/internal.go @@ -10,17 +10,36 @@ import ( "fmt" "strconv" "strings" + "sync/atomic" "github.com/cockroachdb/redact" ) +// SeqNum is a sequence number defining precedence among identical keys. A key +// with a higher sequence number takes precedence over a key with an equal user +// key of a lower sequence number. Sequence numbers are stored durably within +// the internal key "trailer" as a 7-byte (uint56) uint, and the maximum +// sequence number is 2^56-1. As keys are committed to the database, they're +// assigned increasing sequence numbers. Readers use sequence numbers to read a +// consistent database state, ignoring keys with sequence numbers larger than +// the readers' "visible sequence number." +// +// The database maintains an invariant that no two point keys with equal user +// keys may have equal sequence numbers. Keys with differing user keys may have +// equal sequence numbers. A point key and a range deletion or range key that +// include that point key can have equal sequence numbers - in that case, the +// range key does not apply to the point key. A key's sequence number may be +// changed to zero during compactions when it can be proven that no identical +// keys with lower sequence numbers exist. +type SeqNum uint64 + const ( // SeqNumZero is the zero sequence number, set by compactions if they can // guarantee there are no keys underneath an internal key. - SeqNumZero = uint64(0) + SeqNumZero SeqNum = 0 // SeqNumStart is the first sequence number assigned to a key. Sequence // numbers 1-9 are reserved for potential future use. - SeqNumStart = uint64(10) + SeqNumStart SeqNum = 10 ) // InternalKeyKind enumerates the kind of key: a deletion tombstone, a set @@ -114,30 +133,30 @@ const ( // InternalKeyZeroSeqnumMaxTrailer is the largest trailer with a // zero sequence number. - InternalKeyZeroSeqnumMaxTrailer = uint64(255) + InternalKeyZeroSeqnumMaxTrailer Trailer = 255 // A marker for an invalid key. InternalKeyKindInvalid InternalKeyKind = InternalKeyKindSSTableInternalObsoleteMask // InternalKeySeqNumBatch is a bit that is set on batch sequence numbers // which prevents those entries from being excluded from iteration. - InternalKeySeqNumBatch = uint64(1 << 55) + InternalKeySeqNumBatch SeqNum = 1 << 55 // InternalKeySeqNumMax is the largest valid sequence number. - InternalKeySeqNumMax = uint64(1<<56 - 1) + InternalKeySeqNumMax SeqNum = 1<<56 - 1 // InternalKeyRangeDeleteSentinel is the marker for a range delete sentinel // key. This sequence number and kind are used for the upper stable boundary // when a range deletion tombstone is the largest key in an sstable. This is // necessary because sstable boundaries are inclusive, while the end key of a // range deletion tombstone is exclusive. - InternalKeyRangeDeleteSentinel = (InternalKeySeqNumMax << 8) | uint64(InternalKeyKindRangeDelete) + InternalKeyRangeDeleteSentinel = (Trailer(InternalKeySeqNumMax) << 8) | Trailer(InternalKeyKindRangeDelete) // InternalKeyBoundaryRangeKey is the marker for a range key boundary. This // sequence number and kind are used during interleaved range key and point // iteration to allow an iterator to stop at range key start keys where // there exists no point key. - InternalKeyBoundaryRangeKey = (InternalKeySeqNumMax << 8) | uint64(InternalKeyKindRangeKeySet) + InternalKeyBoundaryRangeKey = (Trailer(InternalKeySeqNumMax) << 8) | Trailer(InternalKeyKindRangeKeySet) ) // Assert InternalKeyKindSSTableInternalObsoleteBit > InternalKeyKindMax @@ -172,6 +191,25 @@ func (k InternalKeyKind) SafeFormat(w redact.SafePrinter, _ rune) { w.Print(redact.SafeString(k.String())) } +// Trailer encodes a SeqNum and an InternalKeyKind. +type Trailer uint64 + +// MakeTrailer constructs an internal key trailer from the specified sequence +// number and kind. +func MakeTrailer(seqNum SeqNum, kind InternalKeyKind) Trailer { + return (Trailer(seqNum) << 8) | Trailer(kind) +} + +// SeqNum returns the sequence number component of the trailer. +func (t Trailer) SeqNum() SeqNum { + return SeqNum(t >> 8) +} + +// Kind returns the key kind component of the trailer. +func (t Trailer) Kind() InternalKeyKind { + return InternalKeyKind(t & 0xff) +} + // InternalKey is a key used for the in-memory and on-disk partial DBs that // make up a pebble DB. // @@ -181,37 +219,28 @@ func (k InternalKeyKind) SafeFormat(w redact.SafePrinter, _ rune) { // - 7 bytes for a uint56 sequence number, in little-endian format. type InternalKey struct { UserKey []byte - Trailer uint64 + Trailer Trailer } // InvalidInternalKey is an invalid internal key for which Valid() will return // false. -var InvalidInternalKey = MakeInternalKey(nil, 0, InternalKeyKindInvalid) +var InvalidInternalKey = MakeInternalKey(nil, SeqNumZero, InternalKeyKindInvalid) // MakeInternalKey constructs an internal key from a specified user key, // sequence number and kind. -func MakeInternalKey(userKey []byte, seqNum uint64, kind InternalKeyKind) InternalKey { +func MakeInternalKey(userKey []byte, seqNum SeqNum, kind InternalKeyKind) InternalKey { return InternalKey{ UserKey: userKey, - Trailer: (seqNum << 8) | uint64(kind), + Trailer: MakeTrailer(seqNum, kind), } } -// MakeTrailer constructs an internal key trailer from the specified sequence -// number and kind. -func MakeTrailer(seqNum uint64, kind InternalKeyKind) uint64 { - return (seqNum << 8) | uint64(kind) -} - // MakeSearchKey constructs an internal key that is appropriate for searching // for a the specified user key. The search key contain the maximal sequence // number and kind ensuring that it sorts before any other internal keys for // the same user key. func MakeSearchKey(userKey []byte) InternalKey { - return InternalKey{ - UserKey: userKey, - Trailer: (InternalKeySeqNumMax << 8) | uint64(InternalKeyKindMax), - } + return MakeInternalKey(userKey, InternalKeySeqNumMax, InternalKeyKindMax) } // MakeRangeDeleteSentinelKey constructs an internal key that is a range @@ -228,10 +257,7 @@ func MakeRangeDeleteSentinelKey(userKey []byte) InternalKey { // exclusive sentinel key, used as the upper boundary for an sstable // when a ranged key is the largest key in an sstable. func MakeExclusiveSentinelKey(kind InternalKeyKind, userKey []byte) InternalKey { - return InternalKey{ - UserKey: userKey, - Trailer: (InternalKeySeqNumMax << 8) | uint64(kind), - } + return MakeInternalKey(userKey, InternalKeySeqNumMax, kind) } var kindsMap = map[string]InternalKeyKind{ @@ -256,20 +282,38 @@ var kindsMap = map[string]InternalKeyKind{ // is marked as a batch-seq-num (i.e. the InternalKeySeqNumBatch bit is set). func ParseInternalKey(s string) InternalKey { x := strings.Split(s, ".") + if len(x) != 3 { + panic(fmt.Sprintf("invalid internal key %q", s)) + } ukey := x[0] kind, ok := kindsMap[x[1]] if !ok { panic(fmt.Sprintf("unknown kind: %q", x[1])) } - j := 0 - if x[2][0] == 'b' { - j = 1 + seqNum := ParseSeqNum(x[2]) + return MakeInternalKey([]byte(ukey), seqNum, kind) +} + +// ParseSeqNum parses the string representation of a sequence number. +// "inf" is supported as the maximum sequence number (mainly used for exclusive +// end keys). +func ParseSeqNum(s string) SeqNum { + if s == "inf" { + return InternalKeySeqNumMax } - seqNum, _ := strconv.ParseUint(x[2][j:], 10, 64) - if x[2][0] == 'b' { + batch := s[0] == 'b' + if batch { + s = s[1:] + } + n, err := strconv.ParseUint(s, 10, 64) + if err != nil { + panic(fmt.Sprintf("error parsing %q as seqnum: %s", s, err)) + } + seqNum := SeqNum(n) + if batch { seqNum |= InternalKeySeqNumBatch } - return MakeInternalKey([]byte(ukey), seqNum, kind) + return seqNum } // ParseKind parses the string representation of an internal key kind. @@ -287,12 +331,12 @@ const InternalTrailerLen = 8 // DecodeInternalKey decodes an encoded internal key. See InternalKey.Encode(). func DecodeInternalKey(encodedKey []byte) InternalKey { n := len(encodedKey) - InternalTrailerLen - var trailer uint64 + var trailer Trailer if n >= 0 { - trailer = binary.LittleEndian.Uint64(encodedKey[n:]) + trailer = Trailer(binary.LittleEndian.Uint64(encodedKey[n:])) encodedKey = encodedKey[:n:n] } else { - trailer = uint64(InternalKeyKindInvalid) + trailer = Trailer(InternalKeyKindInvalid) encodedKey = nil } return InternalKey{ @@ -318,13 +362,13 @@ func InternalCompare(userCmp Compare, a, b InternalKey) int { // to hold the encoded data. See InternalKey.Size(). func (k InternalKey) Encode(buf []byte) { i := copy(buf, k.UserKey) - binary.LittleEndian.PutUint64(buf[i:], k.Trailer) + binary.LittleEndian.PutUint64(buf[i:], uint64(k.Trailer)) } // EncodeTrailer returns the trailer encoded to an 8-byte array. func (k InternalKey) EncodeTrailer() [8]byte { var buf [8]byte - binary.LittleEndian.PutUint64(buf[:], k.Trailer) + binary.LittleEndian.PutUint64(buf[:], uint64(k.Trailer)) return buf } @@ -371,13 +415,13 @@ func (k InternalKey) Size() int { } // SetSeqNum sets the sequence number component of the key. -func (k *InternalKey) SetSeqNum(seqNum uint64) { - k.Trailer = (seqNum << 8) | (k.Trailer & 0xff) +func (k *InternalKey) SetSeqNum(seqNum SeqNum) { + k.Trailer = (Trailer(seqNum) << 8) | (k.Trailer & 0xff) } // SeqNum returns the sequence number component of the key. -func (k InternalKey) SeqNum() uint64 { - return k.Trailer >> 8 +func (k InternalKey) SeqNum() SeqNum { + return SeqNum(k.Trailer >> 8) } // IsUpperBoundFor returns true if a range ending in k contains the userKey: @@ -388,20 +432,15 @@ func (k InternalKey) IsUpperBoundFor(cmp Compare, userKey []byte) bool { return c < 0 || (c == 0 && !k.IsExclusiveSentinel()) } -// SeqNumFromTrailer returns the sequence number component of a trailer. -func SeqNumFromTrailer(t uint64) uint64 { - return t >> 8 -} - // Visible returns true if the key is visible at the specified snapshot // sequence number. -func (k InternalKey) Visible(snapshot, batchSnapshot uint64) bool { +func (k InternalKey) Visible(snapshot, batchSnapshot SeqNum) bool { return Visible(k.SeqNum(), snapshot, batchSnapshot) } // Visible returns true if a key with the provided sequence number is visible at // the specified snapshot sequence numbers. -func Visible(seqNum uint64, snapshot, batchSnapshot uint64) bool { +func Visible(seqNum SeqNum, snapshot, batchSnapshot SeqNum) bool { // There are two snapshot sequence numbers, one for committed keys and one // for batch keys. If a seqNum is less than `snapshot`, then seqNum // corresponds to a committed key that is visible. If seqNum has its batch @@ -421,17 +460,12 @@ func Visible(seqNum uint64, snapshot, batchSnapshot uint64) bool { // SetKind sets the kind component of the key. func (k *InternalKey) SetKind(kind InternalKeyKind) { - k.Trailer = (k.Trailer &^ 0xff) | uint64(kind) + k.Trailer = (k.Trailer &^ 0xff) | Trailer(kind) } // Kind returns the kind component of the key. func (k InternalKey) Kind() InternalKeyKind { - return TrailerKind(k.Trailer) -} - -// TrailerKind returns the key kind of the key trailer. -func TrailerKind(trailer uint64) InternalKeyKind { - return InternalKeyKind(trailer & 0xff) + return k.Trailer.Kind() } // Valid returns true if the key has a valid kind. @@ -474,7 +508,7 @@ func (k InternalKey) Pretty(f FormatKey) fmt.Formatter { // with the same user key if used as an end boundary. See the comment on // InternalKeyRangeDeletionSentinel. func (k InternalKey) IsExclusiveSentinel() bool { - if (k.Trailer >> 8) != InternalKeySeqNumMax { + if k.SeqNum() != InternalKeySeqNumMax { return false } switch kind := k.Kind(); kind { @@ -510,11 +544,11 @@ func ParsePrettyInternalKey(s string) InternalKey { } var seqNum uint64 if x[1] == "max" || x[1] == "inf" { - seqNum = InternalKeySeqNumMax + seqNum = uint64(InternalKeySeqNumMax) } else { seqNum, _ = strconv.ParseUint(x[1], 10, 64) } - return MakeInternalKey([]byte(ukey), seqNum, kind) + return MakeInternalKey([]byte(ukey), SeqNum(seqNum), kind) } // MakeInternalKV constructs an InternalKV with the provided internal key and @@ -538,7 +572,7 @@ func (kv *InternalKV) Kind() InternalKeyKind { } // SeqNum returns the KV's internal key sequence number. -func (kv *InternalKV) SeqNum() uint64 { +func (kv *InternalKV) SeqNum() SeqNum { return kv.K.SeqNum() } @@ -554,7 +588,7 @@ func (kv *InternalKV) Value(buf []byte) (val []byte, callerOwned bool, err error // Visible returns true if the key is visible at the specified snapshot // sequence number. -func (kv *InternalKV) Visible(snapshot, batchSnapshot uint64) bool { +func (kv *InternalKV) Visible(snapshot, batchSnapshot SeqNum) bool { return Visible(kv.K.SeqNum(), snapshot, batchSnapshot) } @@ -564,3 +598,28 @@ func (kv *InternalKV) Visible(snapshot, batchSnapshot uint64) bool { func (kv *InternalKV) IsExclusiveSentinel() bool { return kv.K.IsExclusiveSentinel() } + +// AtomicSeqNum is an atomic SeqNum. +type AtomicSeqNum struct { + value atomic.Uint64 +} + +// Load atomically loads and returns the stored SeqNum. +func (asn *AtomicSeqNum) Load() SeqNum { + return SeqNum(asn.value.Load()) +} + +// Store atomically stores s. +func (asn *AtomicSeqNum) Store(s SeqNum) { + asn.value.Store(uint64(s)) +} + +// Add atomically adds delta to asn and returns the new value. +func (asn *AtomicSeqNum) Add(delta SeqNum) SeqNum { + return SeqNum(asn.value.Add(uint64(delta))) +} + +// CompareAndSwap executes the compare-and-swap operation. +func (asn *AtomicSeqNum) CompareAndSwap(old, new SeqNum) bool { + return asn.value.CompareAndSwap(uint64(old), uint64(new)) +} diff --git a/internal/base/internal_test.go b/internal/base/internal_test.go index 39466cd97d..755a5956b4 100644 --- a/internal/base/internal_test.go +++ b/internal/base/internal_test.go @@ -30,7 +30,7 @@ func TestInternalKey(t *testing.T) { if got, want := k.Kind(), InternalKeyKind(1); got != want { t.Errorf("kind = %d want %d", got, want) } - if got, want := k.SeqNum(), uint64(0x08070605040302); got != want { + if got, want := k.SeqNum(), SeqNum(0x08070605040302); got != want { t.Errorf("seqNum = %d want %d", got, want) } } diff --git a/internal/base/test_utils.go b/internal/base/test_utils.go index 08871b735d..0c87f903d9 100644 --- a/internal/base/test_utils.go +++ b/internal/base/test_utils.go @@ -69,7 +69,7 @@ func fakeIkey(s string) InternalKey { if err != nil { panic(err) } - return MakeInternalKey([]byte(s[:j]), uint64(seqNum), InternalKeyKindSet) + return MakeInternalKey([]byte(s[:j]), SeqNum(seqNum), InternalKeyKindSet) } // NewFakeIter returns an iterator over the given KVs. diff --git a/internal/batchskl/skl.go b/internal/batchskl/skl.go index f56d95c28a..ece24ec6e2 100644 --- a/internal/batchskl/skl.go +++ b/internal/batchskl/skl.go @@ -415,7 +415,7 @@ func (s *Skiplist) getKey(nd uint32) base.InternalKey { n := s.node(nd) kind := base.InternalKeyKind((*s.storage)[n.offset]) key := (*s.storage)[n.keyStart:n.keyEnd] - return base.MakeInternalKey(key, uint64(n.offset)|base.InternalKeySeqNumBatch, kind) + return base.MakeInternalKey(key, base.SeqNum(n.offset)|base.InternalKeySeqNumBatch, kind) } func (s *Skiplist) getNext(nd, h uint32) uint32 { diff --git a/internal/compact/iterator.go b/internal/compact/iterator.go index 375f56a4ff..69f287ad3e 100644 --- a/internal/compact/iterator.go +++ b/internal/compact/iterator.go @@ -176,7 +176,7 @@ type Iter struct { // keyTrailer is updated when `i.key` is updated and holds the key's // original trailer (eg, before any sequence-number zeroing or changes to // key kind). - keyTrailer uint64 + keyTrailer base.Trailer value []byte valueCloser io.Closer // Temporary buffer used for storing the previous user key in order to @@ -223,7 +223,7 @@ type Iter struct { forceObsoleteDueToRangeDel bool // The index of the snapshot for the current key within the snapshots slice. curSnapshotIdx int - curSnapshotSeqNum uint64 + curSnapshotSeqNum base.SeqNum // frontiers holds a heap of user keys that affect compaction behavior when // they're exceeded. Before a new key is returned, the compaction iterator // advances the frontier, notifying any code that subscribed to be notified @@ -738,7 +738,7 @@ func (i *Iter) nextInStripeHelper() stripeChangeType { // were ingested, but range keys are interleaved into the compaction // iterator's input iterator at the maximal sequence number so their // original sequence number will not be observed here. - if prevSeqNum := base.SeqNumFromTrailer(i.keyTrailer); (prevSeqNum == 0 || prevSeqNum <= kv.SeqNum()) && + if prevSeqNum := i.keyTrailer.SeqNum(); (prevSeqNum == 0 || prevSeqNum <= kv.SeqNum()) && i.key.Kind() != base.InternalKeyKindRangeDelete && kv.Kind() != base.InternalKeyKindRangeDelete { prevKey := i.key prevKey.Trailer = i.keyTrailer @@ -1312,7 +1312,7 @@ const ( // The key's UserKey must be greater or equal to the last span Start key passed // to AddTombstoneSpan. The keys passed to tombstoneCovers calls must be // ordered. -func (i *Iter) tombstoneCovers(key base.InternalKey, snapshot uint64) cover { +func (i *Iter) tombstoneCovers(key base.InternalKey, snapshot base.SeqNum) cover { if i.lastRangeDelSpan.Empty() { return noCover } diff --git a/internal/compact/iterator_test.go b/internal/compact/iterator_test.go index f52a9b15ec..ba3376280c 100644 --- a/internal/compact/iterator_test.go +++ b/internal/compact/iterator_test.go @@ -158,11 +158,7 @@ func TestCompactionIter(t *testing.T) { switch arg.Key { case "snapshots": for _, val := range arg.Vals { - seqNum, err := strconv.Atoi(val) - if err != nil { - return err.Error() - } - snapshots = append(snapshots, uint64(seqNum)) + snapshots = append(snapshots, base.ParseSeqNum(val)) } case "elide-tombstones": var err error diff --git a/internal/compact/snapshots.go b/internal/compact/snapshots.go index 80e3e4e856..5828b6d2b6 100644 --- a/internal/compact/snapshots.go +++ b/internal/compact/snapshots.go @@ -43,11 +43,11 @@ import ( // -- -- // a.DEL.6 ---> a.DEL.6 // a.PUT.5 -type Snapshots []uint64 +type Snapshots []base.SeqNum // Index returns the index of the first snapshot sequence number which is >= seq // or len(s) if there is no such sequence number. -func (s Snapshots) Index(seq uint64) int { +func (s Snapshots) Index(seq base.SeqNum) int { return sort.Search(len(s), func(i int) bool { return s[i] > seq }) @@ -56,7 +56,7 @@ func (s Snapshots) Index(seq uint64) int { // IndexAndSeqNum returns the index of the first snapshot sequence number which // is >= seq and that sequence number, or len(s) and InternalKeySeqNumMax if // there is no such sequence number. -func (s Snapshots) IndexAndSeqNum(seq uint64) (int, uint64) { +func (s Snapshots) IndexAndSeqNum(seq base.SeqNum) (int, base.SeqNum) { index := s.Index(seq) if index == len(s) { return index, base.InternalKeySeqNumMax diff --git a/internal/compact/snapshots_test.go b/internal/compact/snapshots_test.go index 86f8500bac..ce374701ef 100644 --- a/internal/compact/snapshots_test.go +++ b/internal/compact/snapshots_test.go @@ -12,20 +12,20 @@ import ( func TestSnapshotIndex(t *testing.T) { testCases := []struct { - snapshots []uint64 - seq uint64 + snapshots []base.SeqNum + seq base.SeqNum expectedIndex int - expectedSeqNum uint64 + expectedSeqNum base.SeqNum }{ - {snapshots: []uint64{}, seq: 1, expectedIndex: 0, expectedSeqNum: base.InternalKeySeqNumMax}, - {snapshots: []uint64{1}, seq: 0, expectedIndex: 0, expectedSeqNum: 1}, - {snapshots: []uint64{1}, seq: 1, expectedIndex: 1, expectedSeqNum: base.InternalKeySeqNumMax}, - {snapshots: []uint64{1}, seq: 2, expectedIndex: 1, expectedSeqNum: base.InternalKeySeqNumMax}, - {snapshots: []uint64{1, 3}, seq: 1, expectedIndex: 1, expectedSeqNum: 3}, - {snapshots: []uint64{1, 3}, seq: 2, expectedIndex: 1, expectedSeqNum: 3}, - {snapshots: []uint64{1, 3}, seq: 3, expectedIndex: 2, expectedSeqNum: base.InternalKeySeqNumMax}, - {snapshots: []uint64{1, 3}, seq: 4, expectedIndex: 2, expectedSeqNum: base.InternalKeySeqNumMax}, - {snapshots: []uint64{1, 3, 3}, seq: 2, expectedIndex: 1, expectedSeqNum: 3}, + {snapshots: []base.SeqNum{}, seq: 1, expectedIndex: 0, expectedSeqNum: base.InternalKeySeqNumMax}, + {snapshots: []base.SeqNum{1}, seq: 0, expectedIndex: 0, expectedSeqNum: 1}, + {snapshots: []base.SeqNum{1}, seq: 1, expectedIndex: 1, expectedSeqNum: base.InternalKeySeqNumMax}, + {snapshots: []base.SeqNum{1}, seq: 2, expectedIndex: 1, expectedSeqNum: base.InternalKeySeqNumMax}, + {snapshots: []base.SeqNum{1, 3}, seq: 1, expectedIndex: 1, expectedSeqNum: 3}, + {snapshots: []base.SeqNum{1, 3}, seq: 2, expectedIndex: 1, expectedSeqNum: 3}, + {snapshots: []base.SeqNum{1, 3}, seq: 3, expectedIndex: 2, expectedSeqNum: base.InternalKeySeqNumMax}, + {snapshots: []base.SeqNum{1, 3}, seq: 4, expectedIndex: 2, expectedSeqNum: base.InternalKeySeqNumMax}, + {snapshots: []base.SeqNum{1, 3, 3}, seq: 2, expectedIndex: 1, expectedSeqNum: 3}, } for _, c := range testCases { t.Run("", func(t *testing.T) { diff --git a/internal/compact/spans_test.go b/internal/compact/spans_test.go index 4e312695bd..622c77e8c9 100644 --- a/internal/compact/spans_test.go +++ b/internal/compact/spans_test.go @@ -26,13 +26,17 @@ func TestRangeDelSpanCompactor(t *testing.T) { case "compact": var snapshots []uint64 td.MaybeScanArgs(t, "snapshots", &snapshots) + var s Snapshots + for _, v := range snapshots { + s = append(s, base.SeqNum(v)) + } keyRanges := maybeParseInUseKeyRanges(td) span := keyspan.ParseSpan(td.Input) c = MakeRangeDelSpanCompactor( base.DefaultComparer.Compare, base.DefaultComparer.Equal, - snapshots, + s, ElideTombstonesOutsideOf(keyRanges), ) @@ -58,13 +62,17 @@ func TestRangeKeySpanCompactor(t *testing.T) { case "compact": var snapshots []uint64 td.MaybeScanArgs(t, "snapshots", &snapshots) + var s Snapshots + for _, v := range snapshots { + s = append(s, base.SeqNum(v)) + } keyRanges := maybeParseInUseKeyRanges(td) span := keyspan.ParseSpan(td.Input) c = MakeRangeKeySpanCompactor( base.DefaultComparer.Compare, base.DefaultComparer.Equal, - snapshots, + s, ElideTombstonesOutsideOf(keyRanges), ) diff --git a/internal/keyspan/defragment_test.go b/internal/keyspan/defragment_test.go index f3fde82386..8ea6afcd80 100644 --- a/internal/keyspan/defragment_test.go +++ b/internal/keyspan/defragment_test.go @@ -132,7 +132,7 @@ func testDefragmentingIteRandomizedOnce(t *testing.T, seed int64) { } key := Key{ - Trailer: base.MakeTrailer(uint64(i), base.InternalKeyKindRangeKeySet), + Trailer: base.MakeTrailer(base.SeqNum(i), base.InternalKeyKindRangeKeySet), Value: []byte(fmt.Sprintf("v%d", rng.Intn(3))), } // Generate suffixes 0, 1, 2, or 3 with 0 indicating none. diff --git a/internal/keyspan/fragmenter_test.go b/internal/keyspan/fragmenter_test.go index a513eea3e9..9502ba0a81 100644 --- a/internal/keyspan/fragmenter_test.go +++ b/internal/keyspan/fragmenter_test.go @@ -8,7 +8,6 @@ import ( "bytes" "fmt" "regexp" - "strconv" "strings" "testing" @@ -24,14 +23,13 @@ func parseSpanSingleKey(t *testing.T, s string, kind base.InternalKeyKind) Span if len(m) != 5 { t.Fatalf("expected 5 components, but found %d: %s", len(m), s) } - seqNum, err := strconv.Atoi(m[1]) - require.NoError(t, err) + seqNum := base.ParseSeqNum(m[1]) return Span{ Start: []byte(m[2]), End: []byte(m[3]), Keys: []Key{ { - Trailer: base.MakeTrailer(uint64(seqNum), kind), + Trailer: base.MakeTrailer(seqNum, kind), Value: []byte(strings.TrimSpace(m[4])), }, }, @@ -105,14 +103,12 @@ func TestFragmenter(t *testing.T) { var getRe = regexp.MustCompile(`(\w+)#(\d+)`) - parseGet := func(t *testing.T, s string) (string, int) { + parseGet := func(t *testing.T, s string) (string, base.SeqNum) { m := getRe.FindStringSubmatch(s) if len(m) != 3 { t.Fatalf("expected 3 components, but found %d", len(m)) } - seq, err := strconv.Atoi(m[2]) - require.NoError(t, err) - return m[1], seq + return m[1], base.ParseSeqNum(m[2]) } var iter FragmentIterator @@ -121,7 +117,7 @@ func TestFragmenter(t *testing.T) { // read sequence number. Get ignores spans newer than the read sequence // number. This is a simple version of what full processing of range // tombstones looks like. - deleted := func(key []byte, seq, readSeq uint64) bool { + deleted := func(key []byte, seq, readSeq base.SeqNum) bool { s, err := Get(cmp, iter, key) require.NoError(t, err) return s != nil && s.CoversAt(readSeq, seq) @@ -149,13 +145,12 @@ func TestFragmenter(t *testing.T) { if d.CmdArgs[0].Key != "t" { return fmt.Sprintf("expected timestamp argument, but found %s", d.CmdArgs[0]) } - readSeq, err := strconv.Atoi(d.CmdArgs[0].Vals[0]) - require.NoError(t, err) + readSeq := base.ParseSeqNum(d.CmdArgs[0].Vals[0]) var results []string for _, p := range strings.Split(d.Input, " ") { key, seq := parseGet(t, p) - if deleted([]byte(key), uint64(seq), uint64(readSeq)) { + if deleted([]byte(key), seq, readSeq) { results = append(results, "deleted") } else { results = append(results, "alive") diff --git a/internal/keyspan/keyspanimpl/merging_iter_test.go b/internal/keyspan/keyspanimpl/merging_iter_test.go index e17ad0223d..c0eb1681a7 100644 --- a/internal/keyspan/keyspanimpl/merging_iter_test.go +++ b/internal/keyspan/keyspanimpl/merging_iter_test.go @@ -51,9 +51,7 @@ func TestMergingIter(t *testing.T) { for _, cmdArg := range td.CmdArgs { switch cmdArg.Key { case "snapshot": - var err error - snapshot, err = strconv.ParseUint(cmdArg.Vals[0], 10, 64) - require.NoError(t, err) + snapshot = base.ParseSeqNum(cmdArg.Vals[0]) case "probes": // The first value indicates which of the merging iterator's // child iterators is the target. @@ -134,7 +132,7 @@ func testFragmenterEquivalenceOnce(t *testing.T, seed int64) { Keys: make([]keyspan.Key, 0, keyCount), } for k := keyCount; k > 0; k-- { - seqNum := uint64((len(levels)-l)*3) + k + seqNum := base.SeqNum((len(levels)-l)*3) + base.SeqNum(k) s.Keys = append(s.Keys, keyspan.Key{ Trailer: base.MakeTrailer(seqNum, base.InternalKeyKindRangeKeySet), }) diff --git a/internal/keyspan/seek_test.go b/internal/keyspan/seek_test.go index 331475c336..93f2a66393 100644 --- a/internal/keyspan/seek_test.go +++ b/internal/keyspan/seek_test.go @@ -7,7 +7,6 @@ package keyspan import ( "bytes" "fmt" - "strconv" "strings" "testing" @@ -49,10 +48,7 @@ func TestSeek(t *testing.T) { if len(parts) != 2 { return fmt.Sprintf("malformed input: %s", line) } - seq, err := strconv.ParseUint(parts[1], 10, 64) - if err != nil { - return err.Error() - } + seq := base.ParseSeqNum(parts[1]) span, err := seek(cmp, iter, []byte(parts[0])) if err != nil { fmt.Fprintf(&buf, " \n", err) diff --git a/internal/keyspan/span.go b/internal/keyspan/span.go index 3aa07eef50..af84a1bb94 100644 --- a/internal/keyspan/span.go +++ b/internal/keyspan/span.go @@ -9,7 +9,6 @@ import ( "fmt" "slices" "sort" - "strconv" "strings" "unicode" @@ -62,7 +61,7 @@ const ( // is applied. type Key struct { // Trailer contains the key kind and sequence number. - Trailer uint64 + Trailer base.Trailer // Suffix holds an optional suffix associated with the key. This is only // non-nil for RANGEKEYSET and RANGEKEYUNSET keys. Suffix []byte @@ -73,15 +72,15 @@ type Key struct { } // SeqNum returns the sequence number component of the key. -func (k Key) SeqNum() uint64 { - return k.Trailer >> 8 +func (k Key) SeqNum() base.SeqNum { + return k.Trailer.SeqNum() } // VisibleAt returns true if the provided key is visible at the provided // snapshot sequence number. It interprets batch sequence numbers as always // visible, because non-visible batch span keys are filtered when they're // fragmented. -func (k Key) VisibleAt(snapshot uint64) bool { +func (k Key) VisibleAt(snapshot base.SeqNum) bool { seq := k.SeqNum() return seq < snapshot || seq&base.InternalKeySeqNumBatch != 0 } @@ -192,7 +191,7 @@ func (s *Span) LargestKey() base.InternalKey { // SmallestSeqNum returns the smallest sequence number of a key contained within // the span. It requires the Span's keys be in ByTrailerDesc order. It panics if // the span contains no keys or its keys are sorted in a different order. -func (s *Span) SmallestSeqNum() uint64 { +func (s *Span) SmallestSeqNum() base.SeqNum { if len(s.Keys) == 0 { panic("pebble: Span contains no keys") } else if s.KeysOrder != ByTrailerDesc { @@ -205,7 +204,7 @@ func (s *Span) SmallestSeqNum() uint64 { // LargestSeqNum returns the largest sequence number of a key contained within // the span. It requires the Span's keys be in ByTrailerDesc order. It panics if // the span contains no keys or its keys are sorted in a different order. -func (s *Span) LargestSeqNum() uint64 { +func (s *Span) LargestSeqNum() base.SeqNum { if len(s.Keys) == 0 { panic("pebble: Span contains no keys") } else if s.KeysOrder != ByTrailerDesc { @@ -218,7 +217,7 @@ func (s *Span) LargestSeqNum() uint64 { // within the span that's also visible at the provided snapshot sequence number. // It requires the Span's keys be in ByTrailerDesc order. It panics if the span // contains no keys or its keys are sorted in a different order. -func (s *Span) LargestVisibleSeqNum(snapshot uint64) (largest uint64, ok bool) { +func (s *Span) LargestVisibleSeqNum(snapshot base.SeqNum) (largest base.SeqNum, ok bool) { if s == nil { return 0, false } else if len(s.Keys) == 0 { @@ -243,7 +242,7 @@ func (s *Span) LargestVisibleSeqNum(snapshot uint64) (largest uint64, ok bool) { // // Visible may incur an allocation, so callers should prefer targeted, // non-allocating methods when possible. -func (s Span) Visible(snapshot uint64) Span { +func (s Span) Visible(snapshot base.SeqNum) Span { if s.KeysOrder != ByTrailerDesc { panic("pebble: span's keys unexpectedly not in trailer order") } @@ -320,7 +319,7 @@ func (s Span) Visible(snapshot uint64) Span { // // VisibleAt requires the Span's keys be in ByTrailerDesc order. It panics if // the span's keys are sorted in a different order. -func (s *Span) VisibleAt(snapshot uint64) bool { +func (s *Span) VisibleAt(snapshot base.SeqNum) bool { if s.KeysOrder != ByTrailerDesc { panic("pebble: span's keys unexpectedly not in trailer order") } @@ -364,7 +363,7 @@ func (s *Span) Contains(cmp base.Compare, key []byte) bool { // // Covers requires the Span's keys be in ByTrailerDesc order. It panics if the // span's keys are sorted in a different order. -func (s Span) Covers(seqNum uint64) bool { +func (s Span) Covers(seqNum base.SeqNum) bool { if s.KeysOrder != ByTrailerDesc { panic("pebble: span's keys unexpectedly not in trailer order") } @@ -380,7 +379,7 @@ func (s Span) Covers(seqNum uint64) bool { // // CoversAt requires the Span's keys be in ByTrailerDesc order. It panics if the // span's keys are sorted in a different order. -func (s *Span) CoversAt(snapshot, seqNum uint64) bool { +func (s *Span) CoversAt(snapshot, seqNum base.SeqNum) bool { if s.KeysOrder != ByTrailerDesc { panic("pebble: span's keys unexpectedly not in trailer order") } @@ -502,12 +501,7 @@ func ParseSpan(input string) Span { }) var k Key - // Parse the sequence number. - seqNum, err := strconv.ParseUint(keyFields[0], 10, 64) - if err != nil { - panic(fmt.Sprintf("invalid sequence number: %q: %s", keyFields[0], err)) - } - // Parse the key kind. + seqNum := base.ParseSeqNum(keyFields[0]) kind := base.ParseKind(keyFields[1]) k.Trailer = base.MakeTrailer(seqNum, kind) // Parse the optional suffix. diff --git a/internal/keyspan/span_test.go b/internal/keyspan/span_test.go index 29651fb41f..8603545e12 100644 --- a/internal/keyspan/span_test.go +++ b/internal/keyspan/span_test.go @@ -7,12 +7,11 @@ package keyspan import ( "bytes" "fmt" - "strconv" "strings" "testing" "github.com/cockroachdb/datadriven" - "github.com/stretchr/testify/require" + "github.com/cockroachdb/pebble/internal/base" ) // TODO(jackson): Add unit tests for all of the various Span methods. @@ -41,8 +40,7 @@ func TestSpan_Visible(t *testing.T) { case "visible": var buf bytes.Buffer for _, line := range strings.Split(d.Input, "\n") { - snapshot, err := strconv.ParseUint(line, 10, 64) - require.NoError(t, err) + snapshot := base.ParseSeqNum(line) fmt.Fprintf(&buf, "%-2d: %s\n", snapshot, s.Visible(snapshot)) } return buf.String() @@ -62,8 +60,7 @@ func TestSpan_VisibleAt(t *testing.T) { case "visible-at": var buf bytes.Buffer for _, line := range strings.Split(d.Input, "\n") { - snapshot, err := strconv.ParseUint(line, 10, 64) - require.NoError(t, err) + snapshot := base.ParseSeqNum(line) fmt.Fprintf(&buf, "%-2d: %t\n", snapshot, s.VisibleAt(snapshot)) } return buf.String() @@ -84,11 +81,9 @@ func TestSpan_CoversAt(t *testing.T) { var buf bytes.Buffer for _, line := range strings.Split(d.Input, "\n") { fields := strings.Fields(line) - snapshot, err := strconv.ParseUint(fields[0], 10, 64) - require.NoError(t, err) - seqNum, err := strconv.ParseUint(fields[1], 10, 64) - require.NoError(t, err) - fmt.Fprintf(&buf, "%d %d : %t\n", snapshot, seqNum, s.CoversAt(snapshot, seqNum)) + snapshot := base.ParseSeqNum(fields[0]) + seqNum := base.ParseSeqNum(fields[1]) + fmt.Fprintf(&buf, "%d %d : %t\n", snapshot, seqNum, s.CoversAt(base.SeqNum(snapshot), base.SeqNum(seqNum))) } return buf.String() default: diff --git a/internal/keyspan/transformer.go b/internal/keyspan/transformer.go index fe1f1fd080..3914ad4a68 100644 --- a/internal/keyspan/transformer.go +++ b/internal/keyspan/transformer.go @@ -33,7 +33,7 @@ var NoopTransform Transformer = TransformerFunc(func(_ base.Compare, s Span, dst // VisibleTransform filters keys that are invisible at the provided snapshot // sequence number. -func VisibleTransform(snapshot uint64) Transformer { +func VisibleTransform(snapshot base.SeqNum) Transformer { return TransformerFunc(func(_ base.Compare, s Span, dst *Span) error { dst.Start, dst.End = s.Start, s.End dst.Keys = dst.Keys[:0] diff --git a/internal/manifest/l0_sublevels.go b/internal/manifest/l0_sublevels.go index d3665985c3..04cef07ff2 100644 --- a/internal/manifest/l0_sublevels.go +++ b/internal/manifest/l0_sublevels.go @@ -1152,7 +1152,7 @@ type L0CompactionFiles struct { // Set for intra-L0 compactions. SSTables with sequence numbers greater // than earliestUnflushedSeqNum cannot be a part of intra-L0 compactions. isIntraL0 bool - earliestUnflushedSeqNum uint64 + earliestUnflushedSeqNum base.SeqNum // For debugging purposes only. Used in checkCompaction(). preExtensionMinInterval int @@ -1563,7 +1563,7 @@ func (s *L0Sublevels) baseCompactionUsingSeed( // compaction is possible (i.e. does not conflict with any base/intra-L0 // compacting files). func (s *L0Sublevels) extendFiles( - sl int, earliestUnflushedSeqNum uint64, cFiles *L0CompactionFiles, + sl int, earliestUnflushedSeqNum base.SeqNum, cFiles *L0CompactionFiles, ) bool { index, _ := slices.BinarySearchFunc(s.levelFiles[sl], cFiles.minIntervalIndex, func(a *FileMetadata, b int) int { return stdcmp.Compare(a.maxIntervalIndex, b) @@ -1595,7 +1595,7 @@ func (s *L0Sublevels) extendFiles( // See comment above [PickBaseCompaction] for heuristics involved in this // selection. func (s *L0Sublevels) PickIntraL0Compaction( - earliestUnflushedSeqNum uint64, minCompactionDepth int, + earliestUnflushedSeqNum base.SeqNum, minCompactionDepth int, ) (*L0CompactionFiles, error) { scoredIntervals := make([]intervalAndScore, len(s.orderedIntervals)) for i := range s.orderedIntervals { @@ -1663,7 +1663,7 @@ func (s *L0Sublevels) PickIntraL0Compaction( } func (s *L0Sublevels) intraL0CompactionUsingSeed( - f *FileMetadata, intervalIndex int, earliestUnflushedSeqNum uint64, minCompactionDepth int, + f *FileMetadata, intervalIndex int, earliestUnflushedSeqNum base.SeqNum, minCompactionDepth int, ) *L0CompactionFiles { // We know that all the files that overlap with intervalIndex have // LargestSeqNum < earliestUnflushedSeqNum, but for other intervals diff --git a/internal/manifest/l0_sublevels_test.go b/internal/manifest/l0_sublevels_test.go index 279ee5ef2d..b8d5551af9 100644 --- a/internal/manifest/l0_sublevels_test.go +++ b/internal/manifest/l0_sublevels_test.go @@ -338,7 +338,7 @@ func TestL0Sublevels(t *testing.T) { fallthrough case "pick-intra-l0-compaction": minCompactionDepth := 3 - earliestUnflushedSeqNum := uint64(math.MaxUint64) + earliestUnflushedSeqNum := base.SeqNum(math.MaxUint64) for _, arg := range td.CmdArgs { switch arg.Key { case "min_depth": @@ -347,11 +347,7 @@ func TestL0Sublevels(t *testing.T) { t.Fatal(err) } case "earliest_unflushed_seqnum": - eusnInt, err := strconv.Atoi(arg.Vals[0]) - if err != nil { - t.Fatal(err) - } - earliestUnflushedSeqNum = uint64(eusnInt) + earliestUnflushedSeqNum = base.ParseSeqNum(arg.Vals[0]) } } @@ -534,12 +530,12 @@ func TestAddL0FilesEquivalence(t *testing.T) { meta := (&FileMetadata{ FileNum: base.FileNum(i*10 + j + 1), Size: rng.Uint64n(1 << 20), - SmallestSeqNum: uint64(2*i + 1), - LargestSeqNum: uint64(2*i + 2), - LargestSeqNumAbsolute: uint64(2*i + 2), + SmallestSeqNum: base.SeqNum(2*i + 1), + LargestSeqNum: base.SeqNum(2*i + 2), + LargestSeqNumAbsolute: base.SeqNum(2*i + 2), }).ExtendPointKeyBounds( base.DefaultComparer.Compare, - base.MakeInternalKey(startKey, uint64(2*i+1), base.InternalKeyKindSet), + base.MakeInternalKey(startKey, base.SeqNum(2*i+1), base.InternalKeyKindSet), base.MakeRangeDeleteSentinelKey(endKey), ) meta.InitPhysicalBacking() diff --git a/internal/manifest/testutils.go b/internal/manifest/testutils.go index 784735ba7d..6a608d7ddf 100644 --- a/internal/manifest/testutils.go +++ b/internal/manifest/testutils.go @@ -133,6 +133,11 @@ func (p *debugParser) Uint64() uint64 { return x } +// Uint64 parses the next token as a sequence number. +func (p *debugParser) SeqNum() base.SeqNum { + return base.ParseSeqNum(p.Next()) +} + // FileNum parses the next token as a FileNum. func (p *debugParser) FileNum() base.FileNum { return base.FileNum(p.Int()) diff --git a/internal/manifest/version.go b/internal/manifest/version.go index b584899dcd..607bc0346c 100644 --- a/internal/manifest/version.go +++ b/internal/manifest/version.go @@ -37,9 +37,9 @@ type TableInfo struct { // Largest is the largest internal key in the table. Largest InternalKey // SmallestSeqNum is the smallest sequence number in the table. - SmallestSeqNum uint64 + SmallestSeqNum base.SeqNum // LargestSeqNum is the largest sequence number in the table. - LargestSeqNum uint64 + LargestSeqNum base.SeqNum } // TableStats contains statistics on a table used for compaction heuristics, @@ -196,14 +196,14 @@ type FileMetadata struct { // LargestSeqNumAbsolute will be at least as high as the pre-zeroing // sequence number. LargestSeqNumAbsolute is NOT durably persisted, so after // a database restart it takes on the value of LargestSeqNum. - LargestSeqNumAbsolute uint64 + LargestSeqNumAbsolute base.SeqNum // Lower and upper bounds for the smallest and largest sequence numbers in // the table, across both point and range keys. For physical sstables, these // values are tight bounds. For virtual sstables, there is no guarantee that // there will be keys with SmallestSeqNum or LargestSeqNum within virtual // sstable bounds. - SmallestSeqNum uint64 - LargestSeqNum uint64 + SmallestSeqNum base.SeqNum + LargestSeqNum base.SeqNum // SmallestPointKey and LargestPointKey are the inclusive bounds for the // internal point keys stored in the table. This includes RANGEDELs, which // alter point keys. @@ -787,9 +787,9 @@ func ParseFileMetadataDebug(s string) (_ *FileMetadata, err error) { switch field { case "seqnums": p.Expect("[") - m.SmallestSeqNum = p.Uint64() + m.SmallestSeqNum = p.SeqNum() p.Expect("-") - m.LargestSeqNum = p.Uint64() + m.LargestSeqNum = p.SeqNum() p.Expect("]") m.LargestSeqNumAbsolute = m.LargestSeqNum diff --git a/internal/manifest/version_edit.go b/internal/manifest/version_edit.go index adb6315fb5..99e428a090 100644 --- a/internal/manifest/version_edit.go +++ b/internal/manifest/version_edit.go @@ -116,7 +116,7 @@ type VersionEdit struct { // LastSeqNum is an upper bound on the sequence numbers that have been // assigned in flushed WALs. Unflushed WALs (that will be replayed during // recovery) may contain sequence numbers greater than this value. - LastSeqNum uint64 + LastSeqNum base.SeqNum // A file num may be present in both deleted files and new files when it // is moved from a lower level to a higher level (when the compaction @@ -200,7 +200,7 @@ func (v *VersionEdit) Decode(r io.Reader) error { if err != nil { return err } - v.LastSeqNum = n + v.LastSeqNum = base.SeqNum(n) case tagCompactPointer: if _, err := d.readLevel(); err != nil { @@ -324,17 +324,19 @@ func (v *VersionEdit) Decode(r io.Reader) error { return err } } - var smallestSeqNum uint64 - var largestSeqNum uint64 + var smallestSeqNum base.SeqNum + var largestSeqNum base.SeqNum if tag != tagNewFile { - smallestSeqNum, err = d.readUvarint() + n, err := d.readUvarint() if err != nil { return err } - largestSeqNum, err = d.readUvarint() + smallestSeqNum = base.SeqNum(n) + n, err = d.readUvarint() if err != nil { return err } + largestSeqNum = base.SeqNum(n) } var markedForCompaction bool var creationTime uint64 @@ -634,7 +636,7 @@ func (v *VersionEdit) Encode(w io.Writer) error { // ComparerName is set. if v.LastSeqNum != 0 || v.ComparerName != "" { e.writeUvarint(tagLastSequence) - e.writeUvarint(v.LastSeqNum) + e.writeUvarint(uint64(v.LastSeqNum)) } for x := range v.DeletedFiles { e.writeUvarint(tagDeletedFile) @@ -682,8 +684,8 @@ func (v *VersionEdit) Encode(w io.Writer) error { e.writeKey(x.Meta.SmallestRangeKey) e.writeKey(x.Meta.LargestRangeKey) } - e.writeUvarint(x.Meta.SmallestSeqNum) - e.writeUvarint(x.Meta.LargestSeqNum) + e.writeUvarint(uint64(x.Meta.SmallestSeqNum)) + e.writeUvarint(uint64(x.Meta.LargestSeqNum)) if customFields { if x.Meta.CreationTime != 0 { e.writeUvarint(customTagCreationTime) diff --git a/internal/manifest/version_edit_test.go b/internal/manifest/version_edit_test.go index 009e05cbfd..286be268ac 100644 --- a/internal/manifest/version_edit_test.go +++ b/internal/manifest/version_edit_test.go @@ -421,7 +421,7 @@ func TestVersionEditEncodeLastSeqNum(t *testing.T) { } val, err := d.readUvarint() require.NoError(t, err) - if c.edit.LastSeqNum != val { + if c.edit.LastSeqNum != base.SeqNum(val) { t.Fatalf("expected %d, but found %d", c.edit.LastSeqNum, val) } } diff --git a/internal/manifest/version_test.go b/internal/manifest/version_test.go index 7feaabf3b6..d0580cc578 100644 --- a/internal/manifest/version_test.go +++ b/internal/manifest/version_test.go @@ -658,9 +658,9 @@ func TestCalculateInuseKeyRangesRandomized(t *testing.T) { return []byte{byte(i/26 + 'a'), byte(i%26 + 'a')} } makeIK := func(level, i int) InternalKey { - seqNum := uint64(NumLevels-level) * 100 + seqNum := base.SeqNum(NumLevels-level) * 100 if level == 0 { - seqNum += uint64(i) + seqNum += base.SeqNum(i) } return base.MakeInternalKey( makeUserKey(i), diff --git a/internal/rangekey/coalesce.go b/internal/rangekey/coalesce.go index 84aa7029f2..db7e7ea2aa 100644 --- a/internal/rangekey/coalesce.go +++ b/internal/rangekey/coalesce.go @@ -66,7 +66,7 @@ func Coalesce(cmp base.Compare, eq base.Equal, keys []keyspan.Key, dst *[]keyspa // CoalesceIntoKeysBySuffix is a variant of Coalesce which outputs the results into // keyspan.KeysBySuffix without sorting them. func CoalesceIntoKeysBySuffix( - equal base.Equal, keysBySuffix *keyspan.KeysBySuffix, snapshot uint64, keys []keyspan.Key, + equal base.Equal, keysBySuffix *keyspan.KeysBySuffix, snapshot base.SeqNum, keys []keyspan.Key, ) { // First, enforce visibility and RangeKeyDelete mechanics. We only need to // consider the prefix of keys before and including the first @@ -160,7 +160,7 @@ func CoalesceIntoKeysBySuffix( // rest of the iterator stack expects. type ForeignSSTTransformer struct { Equal base.Equal - SeqNum uint64 + SeqNum base.SeqNum sortBuf keyspan.KeysBySuffix } diff --git a/internal/rangekey/rangekey.go b/internal/rangekey/rangekey.go index 1ac71059a0..2f834c57b2 100644 --- a/internal/rangekey/rangekey.go +++ b/internal/rangekey/rangekey.go @@ -91,7 +91,7 @@ func (e *Encoder) Encode(s *keyspan.Span) error { // sequence number descending, grouping them into sequence numbers. All keys // with identical sequence numbers are flushed together. var del bool - var seqNum uint64 + var seqNum base.SeqNum for i := range s.Keys { if i == 0 || s.Keys[i].SeqNum() != seqNum { if i > 0 { @@ -127,7 +127,7 @@ func (e *Encoder) Encode(s *keyspan.Span) error { // flush constructs internal keys for accumulated key state, and emits the // internal keys. -func (e *Encoder) flush(s *keyspan.Span, seqNum uint64, del bool) error { +func (e *Encoder) flush(s *keyspan.Span, seqNum base.SeqNum, del bool) error { if len(e.sets) > 0 { ik := base.MakeInternalKey(s.Start, seqNum, base.InternalKeyKindRangeKeySet) l := EncodedSetValueLen(s.End, e.sets) diff --git a/internal/rangekeystack/user_iterator.go b/internal/rangekeystack/user_iterator.go index 8397a62382..510a847600 100644 --- a/internal/rangekeystack/user_iterator.go +++ b/internal/rangekeystack/user_iterator.go @@ -52,7 +52,7 @@ import ( // │ // ╰── .Next type UserIteratorConfig struct { - snapshot uint64 + snapshot base.SeqNum comparer *base.Comparer miter keyspanimpl.MergingIter biter keyspan.BoundedIter @@ -88,7 +88,7 @@ func (bufs *Buffers) PrepareForReuse() { // keys not visible at the provided snapshot are ignored. func (ui *UserIteratorConfig) Init( comparer *base.Comparer, - snapshot uint64, + snapshot base.SeqNum, lower, upper []byte, hasPrefix *bool, prefix *[]byte, diff --git a/internal/rangekeystack/user_iterator_test.go b/internal/rangekeystack/user_iterator_test.go index c87f1327b1..e4c84d3bfa 100644 --- a/internal/rangekeystack/user_iterator_test.go +++ b/internal/rangekeystack/user_iterator_test.go @@ -10,7 +10,6 @@ import ( "io" "math" "math/rand" - "strconv" "strings" "testing" "time" @@ -38,9 +37,7 @@ func TestIter(t *testing.T) { visibleSeqNum := base.InternalKeySeqNumMax for _, arg := range td.CmdArgs { if arg.Key == "visible-seq-num" { - var err error - visibleSeqNum, err = strconv.ParseUint(arg.Vals[0], 10, 64) - require.NoError(t, err) + visibleSeqNum = base.ParseSeqNum(arg.Vals[0]) } } @@ -177,7 +174,7 @@ func testDefragmentingIteRandomizedOnce(t *testing.T, seed int64) { } key := keyspan.Key{ - Trailer: base.MakeTrailer(uint64(i), base.InternalKeyKindRangeKeySet), + Trailer: base.MakeTrailer(base.SeqNum(i), base.InternalKeyKindRangeKeySet), Value: []byte(fmt.Sprintf("v%d", rng.Intn(3))), } // Generate suffixes 0, 1, 2, or 3 with 0 indicating none. @@ -375,7 +372,7 @@ func BenchmarkTransform(b *testing.B) { var keys []keyspan.Key for k := 0; k < n; k++ { keys = append(keys, keyspan.Key{ - Trailer: base.MakeTrailer(uint64(n-k), base.InternalKeyKindRangeKeySet), + Trailer: base.MakeTrailer(base.SeqNum(n-k), base.InternalKeyKindRangeKeySet), Suffix: suffixes[k], }) } diff --git a/iterator.go b/iterator.go index 76e45b066c..551adb5119 100644 --- a/iterator.go +++ b/iterator.go @@ -247,12 +247,12 @@ type Iterator struct { newIters tableNewIters newIterRangeKey keyspanimpl.TableNewSpanIter lazyCombinedIter lazyCombinedIter - seqNum uint64 + seqNum base.SeqNum // batchSeqNum is used by Iterators over indexed batches to detect when the // underlying batch has been mutated. The batch beneath an indexed batch may // be mutated while the Iterator is open, but new keys are not surfaced // until the next call to SetOptions. - batchSeqNum uint64 + batchSeqNum base.SeqNum // batch{PointIter,RangeDelIter,RangeKeyIter} are used when the Iterator is // configured to read through an indexed batch. If a batch is set, these // iterators will be included within the iterator stack regardless of @@ -2608,7 +2608,7 @@ func (i *Iterator) SetOptions(o *IterOptions) { // iterator or range-key iterator but we require one, it'll be created in // the slow path that reconstructs the iterator in finishInitializingIter. if i.batch != nil { - nextBatchSeqNum := (uint64(len(i.batch.data)) | base.InternalKeySeqNumBatch) + nextBatchSeqNum := (base.SeqNum(len(i.batch.data)) | base.InternalKeySeqNumBatch) if nextBatchSeqNum != i.batchSeqNum { i.batchSeqNum = nextBatchSeqNum if i.merging != nil { @@ -2867,7 +2867,7 @@ func (i *Iterator) CloneWithContext(ctx context.Context, opts CloneOptions) (*It // If the caller requested the clone have a current view of the indexed // batch, set the clone's batch sequence number appropriately. if i.batch != nil && opts.RefreshBatchView { - dbi.batchSeqNum = (uint64(len(i.batch.data)) | base.InternalKeySeqNumBatch) + dbi.batchSeqNum = (base.SeqNum(len(i.batch.data)) | base.InternalKeySeqNumBatch) } return finishInitializingIter(ctx, buf), nil diff --git a/iterator_histories_test.go b/iterator_histories_test.go index 21f781633e..f5de4658d1 100644 --- a/iterator_histories_test.go +++ b/iterator_histories_test.go @@ -13,6 +13,7 @@ import ( "github.com/cockroachdb/datadriven" "github.com/cockroachdb/errors" + "github.com/cockroachdb/pebble/internal/base" "github.com/cockroachdb/pebble/internal/invariants" "github.com/cockroachdb/pebble/internal/testkeys" "github.com/cockroachdb/pebble/sstable" @@ -337,10 +338,7 @@ func TestIterHistories(t *testing.T) { return uint64(v) < min || uint64(v) >= max } case "snapshot": - s, err := strconv.ParseUint(arg.Vals[0], 10, 64) - if err != nil { - return err.Error() - } + s := base.ParseSeqNum(arg.Vals[0]) func() { d.mu.Lock() defer d.mu.Unlock() diff --git a/iterator_test.go b/iterator_test.go index a1af998ffd..3576a42a4f 100644 --- a/iterator_test.go +++ b/iterator_test.go @@ -166,7 +166,7 @@ func TestIterator(t *testing.T) { var merge Merge var kvs []base.InternalKV - newIter := func(seqNum uint64, opts IterOptions) *Iterator { + newIter := func(seqNum base.SeqNum, opts IterOptions) *Iterator { if merge == nil { merge = DefaultMerger.Merge } @@ -218,7 +218,7 @@ func TestIterator(t *testing.T) { opts.UpperBound = []byte(upper) } - iter := newIter(seqNum, opts) + iter := newIter(base.SeqNum(seqNum), opts) iterOutput := runIterCmd(d, iter, true) stats := iter.Stats() return fmt.Sprintf("%sstats: %s\n", iterOutput, stats.String()) @@ -230,7 +230,7 @@ func TestIterator(t *testing.T) { } type minSeqNumPropertyCollector struct { - minSeqNum uint64 + minSeqNum base.SeqNum } var _ BlockPropertyCollector = (*minSeqNumPropertyCollector)(nil) @@ -257,7 +257,7 @@ func (c *minSeqNumPropertyCollector) FinishIndexBlock(buf []byte) ([]byte, error } func (c *minSeqNumPropertyCollector) FinishTable(buf []byte) ([]byte, error) { - return binary.AppendUvarint(buf, c.minSeqNum), nil + return binary.AppendUvarint(buf, uint64(c.minSeqNum)), nil } func (c *minSeqNumPropertyCollector) AddCollectedWithSuffixReplacement( @@ -570,7 +570,11 @@ func TestIteratorNextPrev(t *testing.T) { db: d, seqNum: InternalKeySeqNumMax, } - td.MaybeScanArgs(t, "seq", &snap.seqNum) + if td.HasArg("seq") { + var n uint64 + td.ScanArgs(t, "seq", &n) + snap.seqNum = base.SeqNum(n) + } iter, _ := snap.NewIter(nil) return runIterCmd(td, iter, true) diff --git a/level_checker.go b/level_checker.go index 1f24272c3d..4a404c4cb5 100644 --- a/level_checker.go +++ b/level_checker.go @@ -56,7 +56,7 @@ type simpleMergingIterLevel struct { type simpleMergingIter struct { levels []simpleMergingIterLevel - snapshot uint64 + snapshot base.SeqNum heap simpleMergingIterHeap // The last point's key and level. For validation. lastKey InternalKey @@ -74,7 +74,7 @@ type simpleMergingIter struct { func (m *simpleMergingIter) init( merge Merge, cmp Compare, - snapshot uint64, + snapshot base.SeqNum, formatKey base.FormatKey, levels ...simpleMergingIterLevel, ) { @@ -358,7 +358,7 @@ type checkConfig struct { comparer *Comparer readState *readState newIters tableNewIters - seqNum uint64 + seqNum base.SeqNum stats *CheckLevelsStats merge Merge formatKey base.FormatKey @@ -448,7 +448,7 @@ func addTombstonesFromIter( lsmLevel int, fileNum FileNum, tombstones []tombstoneWithLevel, - seqNum uint64, + seqNum base.SeqNum, cmp Compare, formatKey base.FormatKey, ) (_ []tombstoneWithLevel, err error) { diff --git a/mem_table.go b/mem_table.go index d1ed16e289..5b035b6fc3 100644 --- a/mem_table.go +++ b/mem_table.go @@ -85,7 +85,7 @@ type memTable struct { rangeKeys keySpanCache // The current logSeqNum at the time the memtable was created. This is // guaranteed to be less than or equal to any seqnum stored in the memtable. - logSeqNum uint64 + logSeqNum base.SeqNum releaseAccountingReservation func() } @@ -104,7 +104,7 @@ type memTableOptions struct { *Options arenaBuf []byte size int - logSeqNum uint64 + logSeqNum base.SeqNum releaseAccountingReservation func() } @@ -201,7 +201,7 @@ func (m *memTable) prepare(batch *Batch) error { return nil } -func (m *memTable) apply(batch *Batch, seqNum uint64) error { +func (m *memTable) apply(batch *Batch, seqNum base.SeqNum) error { if seqNum < m.logSeqNum { return base.CorruptionErrorf("pebble: batch seqnum %d is less than memtable creation seqnum %d", errors.Safe(seqNum), errors.Safe(m.logSeqNum)) @@ -239,9 +239,9 @@ func (m *memTable) apply(batch *Batch, seqNum uint64) error { return err } } - if seqNum != startSeqNum+uint64(batch.Count()) { + if seqNum != startSeqNum+base.SeqNum(batch.Count()) { return base.CorruptionErrorf("pebble: inconsistent batch count: %d vs %d", - errors.Safe(seqNum), errors.Safe(startSeqNum+uint64(batch.Count()))) + errors.Safe(seqNum), errors.Safe(startSeqNum+base.SeqNum(batch.Count()))) } if tombstoneCount != 0 { m.tombstones.invalidate(tombstoneCount) diff --git a/mem_table_test.go b/mem_table_test.go index 3d55fd729a..897bb9d722 100644 --- a/mem_table_test.go +++ b/mem_table_test.go @@ -10,7 +10,6 @@ import ( "fmt" "strconv" "strings" - "sync/atomic" "testing" "time" "unicode" @@ -276,7 +275,7 @@ func TestMemTableIter(t *testing.T) { func TestMemTableDeleteRange(t *testing.T) { var mem *memTable - var seqNum uint64 + var seqNum base.SeqNum datadriven.RunTest(t, "testdata/delete_range", func(t *testing.T, td *datadriven.TestData) string { switch td.Cmd { @@ -296,7 +295,7 @@ func TestMemTableDeleteRange(t *testing.T) { if err := mem.apply(b, seqNum); err != nil { return err.Error() } - seqNum += uint64(b.Count()) + seqNum += base.SeqNum(b.Count()) return "" case "scan": @@ -327,7 +326,7 @@ func TestMemTableConcurrentDeleteRange(t *testing.T) { const workers = 10 eg, _ := errgroup.WithContext(context.Background()) - var seqNum atomic.Uint64 + var seqNum base.AtomicSeqNum seqNum.Store(1) for i := 0; i < workers; i++ { i := i @@ -416,7 +415,7 @@ func TestMemTable(t *testing.T) { var seqNum uint64 td.ScanArgs(t, "name", &name) td.ScanArgs(t, "seq", &seqNum) - if err := m.apply(batches[name], seqNum); err != nil { + if err := m.apply(batches[name], base.SeqNum(seqNum)); err != nil { return err.Error() } delete(batches, name) diff --git a/merging_iter.go b/merging_iter.go index aef5856509..f65491fc23 100644 --- a/merging_iter.go +++ b/merging_iter.go @@ -228,8 +228,8 @@ type mergingIter struct { logger Logger split Split dir int - snapshot uint64 - batchSnapshot uint64 + snapshot base.SeqNum + batchSnapshot base.SeqNum levels []mergingIterLevel heap mergingIterHeap err error diff --git a/merging_iter_test.go b/merging_iter_test.go index 4a141af3cd..866b6f6c8b 100644 --- a/merging_iter_test.go +++ b/merging_iter_test.go @@ -585,14 +585,14 @@ func buildLevelsForMergingIterSeqSeek( } for j := 1; j < len(files); j++ { for _, k := range []int{0, len(keys) - 1} { - ikey := base.MakeInternalKey(keys[k], uint64(j), InternalKeyKindSet) + ikey := base.MakeInternalKey(keys[k], base.SeqNum(j), InternalKeyKindSet) writers[j][0].Add(ikey, nil) } } lastKey := []byte(fmt.Sprintf("%08d", i)) keys = append(keys, lastKey) for j := 0; j < len(files); j++ { - lastIKey := base.MakeInternalKey(lastKey, uint64(j), InternalKeyKindSet) + lastIKey := base.MakeInternalKey(lastKey, base.SeqNum(j), InternalKeyKindSet) writers[j][1].Add(lastIKey, nil) } for _, levelWriters := range writers { diff --git a/metamorphic/ops.go b/metamorphic/ops.go index daf132f28a..4944403908 100644 --- a/metamorphic/ops.go +++ b/metamorphic/ops.go @@ -1926,7 +1926,7 @@ func (r *replicateOp) runSharedReplicate( } return w.Add(base.MakeInternalKey(key.UserKey, 0, key.Kind()), val) }, - func(start, end []byte, seqNum uint64) error { + func(start, end []byte, seqNum base.SeqNum) error { return w.DeleteRange(start, end) }, func(start, end []byte, keys []keyspan.Key) error { @@ -1990,7 +1990,7 @@ func (r *replicateOp) runExternalReplicate( } return w.Add(base.MakeInternalKey(key.UserKey, 0, key.Kind()), val) }, - func(start, end []byte, seqNum uint64) error { + func(start, end []byte, seqNum base.SeqNum) error { return w.DeleteRange(start, end) }, func(start, end []byte, keys []keyspan.Key) error { diff --git a/metrics.go b/metrics.go index 1d997eedfc..6d2d60ce1c 100644 --- a/metrics.go +++ b/metrics.go @@ -239,7 +239,7 @@ type Metrics struct { // The number of currently open snapshots. Count int // The sequence number of the earliest, currently open snapshot. - EarliestSeqNum uint64 + EarliestSeqNum base.SeqNum // A running tally of keys written to sstables during flushes or // compactions that would've been elided if it weren't for open // snapshots. diff --git a/open.go b/open.go index cebfb23546..20aa384cf6 100644 --- a/open.go +++ b/open.go @@ -758,7 +758,7 @@ func GetVersion(dir string, fs vfs.FS) (string, error) { // re-acquired during the course of this method. func (d *DB) replayWAL( jobID JobID, ve *versionEdit, ll wal.LogicalLog, strictWALTail bool, -) (toFlush flushableList, maxSeqNum uint64, err error) { +) (toFlush flushableList, maxSeqNum base.SeqNum, err error) { rr := ll.OpenForRead() defer rr.Close() var ( @@ -805,7 +805,7 @@ func (d *DB) replayWAL( mem, entry = nil, nil } // Creates a new memtable if there is no current memtable. - ensureMem := func(seqNum uint64) { + ensureMem := func(seqNum base.SeqNum) { if mem != nil { return } @@ -874,7 +874,7 @@ func (d *DB) replayWAL( b.db = d b.SetRepr(buf.Bytes()) seqNum := b.SeqNum() - maxSeqNum = seqNum + uint64(b.Count()) + maxSeqNum = seqNum + base.SeqNum(b.Count()) keysReplayed += int64(b.Count()) batchesReplayed++ { diff --git a/options.go b/options.go index 39643cca7e..17ff484743 100644 --- a/options.go +++ b/options.go @@ -204,7 +204,7 @@ type IterOptions struct { // snapshotForHideObsoletePoints is specified for/by levelIter when opening // files and is used to decide whether to hide obsolete points. A value of 0 // implies obsolete points should not be hidden. - snapshotForHideObsoletePoints uint64 + snapshotForHideObsoletePoints base.SeqNum // NB: If adding new Options, you must account for them in iterator // construction and Iterator.SetOptions. @@ -264,7 +264,7 @@ type scanInternalOptions struct { IterOptions visitPointKey func(key *InternalKey, value LazyValue, iterInfo IteratorLevel) error - visitRangeDel func(start, end []byte, seqNum uint64) error + visitRangeDel func(start, end []byte, seqNum SeqNum) error visitRangeKey func(start, end []byte, keys []rangekey.Key) error visitSharedFile func(sst *SharedSSTMeta) error visitExternalFile func(sst *ExternalFile) error diff --git a/range_del_test.go b/range_del_test.go index 91581f47dc..d58916f052 100644 --- a/range_del_test.go +++ b/range_del_test.go @@ -80,7 +80,11 @@ func TestRangeDel(t *testing.T) { db: d, seqNum: InternalKeySeqNumMax, } - td.MaybeScanArgs(t, "seq", &snap.seqNum) + if td.HasArg("seq") { + var n uint64 + td.ScanArgs(t, "seq", &n) + snap.seqNum = base.SeqNum(n) + } iter, _ := snap.NewIter(nil) return runIterCmd(td, iter, true) diff --git a/scan_internal.go b/scan_internal.go index fcec4ab80f..bb7e52e54b 100644 --- a/scan_internal.go +++ b/scan_internal.go @@ -126,7 +126,7 @@ type pointCollapsingIterator struct { comparer *base.Comparer merge base.Merge err error - seqNum uint64 + seqNum base.SeqNum // The current position of `iter`. Always owned by the underlying iter. iterKV *base.InternalKV // The last saved key. findNextEntry and similar methods are expected to save @@ -144,7 +144,7 @@ type pointCollapsingIterator struct { savedKeyBuf []byte // If fixedSeqNum is non-zero, all emitted points are verified to have this // fixed sequence number. - fixedSeqNum uint64 + fixedSeqNum base.SeqNum } func (p *pointCollapsingIterator) Span() *keyspan.Span { @@ -423,7 +423,7 @@ type scanInternalIterator struct { alloc *iterAlloc newIters tableNewIters newIterRangeKey keyspanimpl.TableNewSpanIter - seqNum uint64 + seqNum base.SeqNum iterLevels []IteratorLevel mergingIter *mergingIter diff --git a/scan_internal_test.go b/scan_internal_test.go index 305bb7a2b9..4ecfd1fbf8 100644 --- a/scan_internal_test.go +++ b/scan_internal_test.go @@ -217,7 +217,7 @@ func TestScanInternal(t *testing.T) { categoryAndQoS sstable.CategoryAndQoS, lower, upper []byte, visitPointKey func(key *InternalKey, value LazyValue, iterInfo IteratorLevel) error, - visitRangeDel func(start, end []byte, seqNum uint64) error, + visitRangeDel func(start, end []byte, seqNum base.SeqNum) error, visitRangeKey func(start, end []byte, keys []keyspan.Key) error, visitSharedFile func(sst *SharedSSTMeta) error, visitExternalFile func(sst *ExternalFile) error, @@ -565,7 +565,7 @@ func TestScanInternal(t *testing.T) { fmt.Fprintf(&b, "%s (%s)\n", key, v) return nil }, - func(start, end []byte, seqNum uint64) error { + func(start, end []byte, seqNum base.SeqNum) error { fmt.Fprintf(&b, "%s-%s#%d,RANGEDEL\n", start, end, seqNum) return nil }, diff --git a/snapshot.go b/snapshot.go index 519c976dc7..1be35931c4 100644 --- a/snapshot.go +++ b/snapshot.go @@ -11,6 +11,7 @@ import ( "sync" "time" + "github.com/cockroachdb/pebble/internal/base" "github.com/cockroachdb/pebble/internal/invariants" "github.com/cockroachdb/pebble/rangekey" "github.com/cockroachdb/pebble/sstable" @@ -20,7 +21,7 @@ import ( type Snapshot struct { // The db the snapshot was created from. db *DB - seqNum uint64 + seqNum base.SeqNum // Set if part of an EventuallyFileOnlySnapshot. efos *EventuallyFileOnlySnapshot @@ -77,7 +78,7 @@ func (s *Snapshot) ScanInternal( categoryAndQoS sstable.CategoryAndQoS, lower, upper []byte, visitPointKey func(key *InternalKey, value LazyValue, iterInfo IteratorLevel) error, - visitRangeDel func(start, end []byte, seqNum uint64) error, + visitRangeDel func(start, end []byte, seqNum base.SeqNum) error, visitRangeKey func(start, end []byte, keys []rangekey.Key) error, visitSharedFile func(sst *SharedSSTMeta) error, visitExternalFile func(sst *ExternalFile) error, @@ -162,19 +163,19 @@ func (l *snapshotList) count() int { return count } -func (l *snapshotList) earliest() uint64 { - v := uint64(math.MaxUint64) +func (l *snapshotList) earliest() base.SeqNum { + v := base.SeqNum(math.MaxUint64) if !l.empty() { v = l.root.next.seqNum } return v } -func (l *snapshotList) toSlice() []uint64 { +func (l *snapshotList) toSlice() []base.SeqNum { if l.empty() { return nil } - var results []uint64 + var results []base.SeqNum for i := l.root.next; i != &l.root; i = i.next { results = append(results, i.seqNum) } @@ -250,7 +251,7 @@ type EventuallyFileOnlySnapshot struct { // The db the snapshot was created from. db *DB - seqNum uint64 + seqNum base.SeqNum closed chan struct{} } @@ -475,7 +476,7 @@ func (es *EventuallyFileOnlySnapshot) ScanInternal( categoryAndQoS sstable.CategoryAndQoS, lower, upper []byte, visitPointKey func(key *InternalKey, value LazyValue, iterInfo IteratorLevel) error, - visitRangeDel func(start, end []byte, seqNum uint64) error, + visitRangeDel func(start, end []byte, seqNum base.SeqNum) error, visitRangeKey func(start, end []byte, keys []rangekey.Key) error, visitSharedFile func(sst *SharedSSTMeta) error, visitExternalFile func(sst *ExternalFile) error, diff --git a/snapshot_test.go b/snapshot_test.go index b9d8f14fcb..cb3b56a725 100644 --- a/snapshot_test.go +++ b/snapshot_test.go @@ -17,18 +17,19 @@ import ( "github.com/cockroachdb/datadriven" "github.com/cockroachdb/errors" + "github.com/cockroachdb/pebble/internal/base" "github.com/cockroachdb/pebble/vfs" "github.com/stretchr/testify/require" ) func TestSnapshotListToSlice(t *testing.T) { testCases := []struct { - vals []uint64 + vals []base.SeqNum }{ {nil}, - {[]uint64{1}}, - {[]uint64{1, 2, 3}}, - {[]uint64{3, 2, 1}}, + {[]base.SeqNum{1}}, + {[]base.SeqNum{1, 2, 3}}, + {[]base.SeqNum{3, 2, 1}}, } for _, c := range testCases { t.Run("", func(t *testing.T) { diff --git a/sstable/block_iter.go b/sstable/block_iter.go index 681494a3c0..c85723df35 100644 --- a/sstable/block_iter.go +++ b/sstable/block_iter.go @@ -436,23 +436,23 @@ func (i *blockIter) readFirstKey() error { // The sstable internal obsolete bit is set when writing a block and unset by // blockIter, so no code outside block writing/reading code ever sees it. -const trailerObsoleteBit = uint64(base.InternalKeyKindSSTableInternalObsoleteBit) -const trailerObsoleteMask = (InternalKeySeqNumMax << 8) | uint64(base.InternalKeyKindSSTableInternalObsoleteMask) +const trailerObsoleteBit = base.Trailer(base.InternalKeyKindSSTableInternalObsoleteBit) +const trailerObsoleteMask = (base.Trailer(InternalKeySeqNumMax) << 8) | base.Trailer(base.InternalKeyKindSSTableInternalObsoleteMask) func (i *blockIter) decodeInternalKey(key []byte) (hiddenPoint bool) { // Manually inlining base.DecodeInternalKey provides a 5-10% speedup on // BlockIter benchmarks. if n := len(key) - 8; n >= 0 { - trailer := binary.LittleEndian.Uint64(key[n:]) + trailer := base.Trailer(binary.LittleEndian.Uint64(key[n:])) hiddenPoint = i.transforms.HideObsoletePoints && (trailer&trailerObsoleteBit != 0) i.ikv.K.Trailer = trailer & trailerObsoleteMask i.ikv.K.UserKey = key[:n:n] if n := i.transforms.SyntheticSeqNum; n != 0 { - i.ikv.K.SetSeqNum(uint64(n)) + i.ikv.K.SetSeqNum(base.SeqNum(n)) } } else { - i.ikv.K.Trailer = uint64(InternalKeyKindInvalid) + i.ikv.K.Trailer = base.Trailer(InternalKeyKindInvalid) i.ikv.K.UserKey = nil } return hiddenPoint @@ -652,7 +652,7 @@ func (i *blockIter) SeekGE(key []byte, flags base.SeekGEFlags) *base.InternalKV if !hiddenPoint && i.cmp(i.ikv.K.UserKey, key) >= 0 { // Initialize i.lazyValue if !i.lazyValueHandling.hasValuePrefix || - base.TrailerKind(i.ikv.K.Trailer) != InternalKeyKindSet { + i.ikv.K.Kind() != InternalKeyKindSet { i.ikv.V = base.MakeInPlaceValue(i.val) } else if i.lazyValueHandling.vbr == nil || !isValueHandle(valuePrefix(i.val[0])) { i.ikv.V = base.MakeInPlaceValue(i.val[1:]) @@ -935,7 +935,7 @@ func (i *blockIter) SeekLT(key []byte, flags base.SeekLTFlags) *base.InternalKV return nil } if !i.lazyValueHandling.hasValuePrefix || - base.TrailerKind(i.ikv.K.Trailer) != InternalKeyKindSet { + i.ikv.K.Kind() != InternalKeyKindSet { i.ikv.V = base.MakeInPlaceValue(i.val) } else if i.lazyValueHandling.vbr == nil || !isValueHandle(valuePrefix(i.val[0])) { i.ikv.V = base.MakeInPlaceValue(i.val[1:]) @@ -964,7 +964,7 @@ func (i *blockIter) First() *base.InternalKV { } i.maybeReplaceSuffix() if !i.lazyValueHandling.hasValuePrefix || - base.TrailerKind(i.ikv.K.Trailer) != InternalKeyKindSet { + i.ikv.K.Kind() != InternalKeyKindSet { i.ikv.V = base.MakeInPlaceValue(i.val) } else if i.lazyValueHandling.vbr == nil || !isValueHandle(valuePrefix(i.val[0])) { i.ikv.V = base.MakeInPlaceValue(i.val[1:]) @@ -1007,7 +1007,7 @@ func (i *blockIter) Last() *base.InternalKV { } i.maybeReplaceSuffix() if !i.lazyValueHandling.hasValuePrefix || - base.TrailerKind(i.ikv.K.Trailer) != InternalKeyKindSet { + i.ikv.K.Kind() != InternalKeyKindSet { i.ikv.V = base.MakeInPlaceValue(i.val) } else if i.lazyValueHandling.vbr == nil || !isValueHandle(valuePrefix(i.val[0])) { i.ikv.V = base.MakeInPlaceValue(i.val[1:]) @@ -1043,13 +1043,13 @@ start: i.readEntry() // Manually inlined version of i.decodeInternalKey(i.key). if n := len(i.key) - 8; n >= 0 { - trailer := binary.LittleEndian.Uint64(i.key[n:]) + trailer := base.Trailer(binary.LittleEndian.Uint64(i.key[n:])) hiddenPoint := i.transforms.HideObsoletePoints && (trailer&trailerObsoleteBit != 0) i.ikv.K.Trailer = trailer & trailerObsoleteMask i.ikv.K.UserKey = i.key[:n:n] if n := i.transforms.SyntheticSeqNum; n != 0 { - i.ikv.K.SetSeqNum(uint64(n)) + i.ikv.K.SetSeqNum(base.SeqNum(n)) } if hiddenPoint { goto start @@ -1062,11 +1062,11 @@ start: i.ikv.K.UserKey = i.synthSuffixBuf } } else { - i.ikv.K.Trailer = uint64(InternalKeyKindInvalid) + i.ikv.K.Trailer = base.Trailer(InternalKeyKindInvalid) i.ikv.K.UserKey = nil } if !i.lazyValueHandling.hasValuePrefix || - base.TrailerKind(i.ikv.K.Trailer) != InternalKeyKindSet { + i.ikv.K.Kind() != InternalKeyKindSet { i.ikv.V = base.MakeInPlaceValue(i.val) } else if i.lazyValueHandling.vbr == nil || !isValueHandle(valuePrefix(i.val[0])) { i.ikv.V = base.MakeInPlaceValue(i.val[1:]) @@ -1323,7 +1323,7 @@ func (i *blockIter) nextPrefixV3(succKey []byte) *base.InternalKV { // Manually inlined version of i.decodeInternalKey(i.key). hiddenPoint := false if n := len(i.key) - 8; n >= 0 { - trailer := binary.LittleEndian.Uint64(i.key[n:]) + trailer := base.Trailer(binary.LittleEndian.Uint64(i.key[n:])) hiddenPoint = i.transforms.HideObsoletePoints && (trailer&trailerObsoleteBit != 0) i.ikv.K = base.InternalKey{ @@ -1331,7 +1331,7 @@ func (i *blockIter) nextPrefixV3(succKey []byte) *base.InternalKV { UserKey: i.key[:n:n], } if n := i.transforms.SyntheticSeqNum; n != 0 { - i.ikv.K.SetSeqNum(uint64(n)) + i.ikv.K.SetSeqNum(base.SeqNum(n)) } if i.transforms.SyntheticSuffix.IsSet() { // Inlined version of i.maybeReplaceSuffix() @@ -1341,7 +1341,7 @@ func (i *blockIter) nextPrefixV3(succKey []byte) *base.InternalKV { i.ikv.K.UserKey = i.synthSuffixBuf } } else { - i.ikv.K.Trailer = uint64(InternalKeyKindInvalid) + i.ikv.K.Trailer = base.Trailer(InternalKeyKindInvalid) i.ikv.K.UserKey = nil } nextCmpCount++ @@ -1357,7 +1357,7 @@ func (i *blockIter) nextPrefixV3(succKey []byte) *base.InternalKV { if invariants.Enabled && !i.lazyValueHandling.hasValuePrefix { panic(errors.AssertionFailedf("nextPrefixV3 being run for non-v3 sstable")) } - if base.TrailerKind(i.ikv.K.Trailer) != InternalKeyKindSet { + if i.ikv.K.Kind() != InternalKeyKindSet { i.ikv.V = base.MakeInPlaceValue(i.val) } else if i.lazyValueHandling.vbr == nil || !isValueHandle(valuePrefix(i.val[0])) { i.ikv.V = base.MakeInPlaceValue(i.val[1:]) @@ -1387,7 +1387,7 @@ start: // Manually inlined version of i.decodeInternalKey(i.key). i.key = i.cachedBuf[e.keyStart:e.keyEnd] if n := len(i.key) - 8; n >= 0 { - trailer := binary.LittleEndian.Uint64(i.key[n:]) + trailer := base.Trailer(binary.LittleEndian.Uint64(i.key[n:])) hiddenPoint := i.transforms.HideObsoletePoints && (trailer&trailerObsoleteBit != 0) if hiddenPoint { @@ -1398,7 +1398,7 @@ start: UserKey: i.key[:n:n], } if n := i.transforms.SyntheticSeqNum; n != 0 { - i.ikv.K.SetSeqNum(uint64(n)) + i.ikv.K.SetSeqNum(base.SeqNum(n)) } if i.transforms.SyntheticSuffix.IsSet() { // Inlined version of i.maybeReplaceSuffix() @@ -1410,12 +1410,12 @@ start: i.ikv.K.UserKey = i.synthSuffixBuf } } else { - i.ikv.K.Trailer = uint64(InternalKeyKindInvalid) + i.ikv.K.Trailer = base.Trailer(InternalKeyKindInvalid) i.ikv.K.UserKey = nil } i.cached = i.cached[:n] if !i.lazyValueHandling.hasValuePrefix || - base.TrailerKind(i.ikv.K.Trailer) != InternalKeyKindSet { + i.ikv.K.Kind() != InternalKeyKindSet { i.ikv.V = base.MakeInPlaceValue(i.val) } else if i.lazyValueHandling.vbr == nil || !isValueHandle(valuePrefix(i.val[0])) { i.ikv.V = base.MakeInPlaceValue(i.val[1:]) @@ -1496,7 +1496,7 @@ start: i.ikv.K.UserKey = i.synthSuffixBuf } if !i.lazyValueHandling.hasValuePrefix || - base.TrailerKind(i.ikv.K.Trailer) != InternalKeyKindSet { + i.ikv.K.Kind() != InternalKeyKindSet { i.ikv.V = base.MakeInPlaceValue(i.val) } else if i.lazyValueHandling.vbr == nil || !isValueHandle(valuePrefix(i.val[0])) { i.ikv.V = base.MakeInPlaceValue(i.val[1:]) diff --git a/sstable/block_test.go b/sstable/block_test.go index a52538cab7..60b1bf2077 100644 --- a/sstable/block_test.go +++ b/sstable/block_test.go @@ -7,7 +7,6 @@ package sstable import ( "bytes" "fmt" - "strconv" "strings" "testing" "time" @@ -148,7 +147,7 @@ func TestInvalidInternalKeyDecoding(t *testing.T) { i := blockIter{} i.decodeInternalKey([]byte(tc)) require.Nil(t, i.ikv.K.UserKey) - require.Equal(t, uint64(InternalKeyKindInvalid), i.ikv.K.Trailer) + require.Equal(t, base.Trailer(InternalKeyKindInvalid), i.ikv.K.Trailer) } } @@ -224,11 +223,8 @@ func TestBlockIter(t *testing.T) { func TestBlockIter2(t *testing.T) { makeIkey := func(s string) InternalKey { j := strings.Index(s, ":") - seqNum, err := strconv.Atoi(s[j+1:]) - if err != nil { - panic(err) - } - return base.MakeInternalKey([]byte(s[:j]), uint64(seqNum), InternalKeyKindSet) + seqNum := base.ParseSeqNum(s[j+1:]) + return base.MakeInternalKey([]byte(s[:j]), seqNum, InternalKeyKindSet) } var block []byte diff --git a/sstable/layout.go b/sstable/layout.go index ec85d9a781..12af5147dc 100644 --- a/sstable/layout.go +++ b/sstable/layout.go @@ -218,7 +218,7 @@ func (l *Layout) Describe( // fetched from a blockIter which does not know about value // blocks. v := kv.InPlaceValue() - if base.TrailerKind(kv.K.Trailer) != InternalKeyKindSet { + if kv.K.Kind() != InternalKeyKindSet { fmtRecord(&kv.K, v) } else if !isValueHandle(valuePrefix(v[0])) { fmtRecord(&kv.K, v[1:]) diff --git a/sstable/random_test.go b/sstable/random_test.go index 10a29f923a..2e39abe9ca 100644 --- a/sstable/random_test.go +++ b/sstable/random_test.go @@ -328,7 +328,7 @@ func buildRandomSSTable(f vfs.File, cfg randomTableConfig) (*WriterMetadata, err type keyID struct { idx int64 suffix int64 - seqNum int64 + seqNum base.SeqNum } keyMap := make(map[keyID]bool) // Constrain the space we generate keys to the middle 90% of the keyspace. @@ -339,14 +339,14 @@ func buildRandomSSTable(f vfs.File, cfg randomTableConfig) (*WriterMetadata, err k := keyID{ idx: cfg.rng.Int63n(sstKeys.Count()), suffix: cfg.rng.Int63n(cfg.maxSuffix + 1), - seqNum: cfg.rng.Int63n(cfg.maxSeqNum + 1), + seqNum: base.SeqNum(cfg.rng.Int63n(cfg.maxSeqNum + 1)), } // If we've already generated this exact key, try again. for keyMap[k] { k = keyID{ idx: cfg.rng.Int63n(sstKeys.Count()), suffix: cfg.rng.Int63n(cfg.maxSuffix + 1), - seqNum: cfg.rng.Int63n(cfg.maxSeqNum + 1), + seqNum: base.SeqNum(cfg.rng.Int63n(cfg.maxSeqNum + 1)), } } keyMap[k] = true @@ -362,7 +362,7 @@ func buildRandomSSTable(f vfs.File, cfg randomTableConfig) (*WriterMetadata, err var keyBuf []byte alloc, keyBuf = alloc.Alloc(testkeys.SuffixLen(keyID.suffix) + cfg.keys.MaxLen()) n := testkeys.WriteKeyAt(keyBuf, sstKeys, keyID.idx, keyID.suffix) - keys[i] = base.MakeInternalKey(keyBuf[:n], uint64(keyID.seqNum), kind) + keys[i] = base.MakeInternalKey(keyBuf[:n], keyID.seqNum, kind) } // The Writer requires the keys to be written in sorted order. Sort them. slices.SortFunc(keys, func(a, b base.InternalKey) int { diff --git a/sstable/reader.go b/sstable/reader.go index f738c1cfba..53202848d6 100644 --- a/sstable/reader.go +++ b/sstable/reader.go @@ -328,8 +328,8 @@ func (r *Reader) NewIterWithBlockPropertyFiltersAndContextEtc( // before the call to NewIterWithBlockPropertyFiltersAndContextEtc, to get the // value of hideObsoletePoints and potentially add a block property filter. func (r *Reader) TryAddBlockPropertyFilterForHideObsoletePoints( - snapshotForHideObsoletePoints uint64, - fileLargestSeqNum uint64, + snapshotForHideObsoletePoints base.SeqNum, + fileLargestSeqNum base.SeqNum, pointKeyFilters []BlockPropertyFilter, ) (hideObsoletePoints bool, filters []BlockPropertyFilter) { hideObsoletePoints = r.tableFormat >= TableFormatPebblev4 && diff --git a/sstable/reader_common.go b/sstable/reader_common.go index e71d565bc2..f93759a60d 100644 --- a/sstable/reader_common.go +++ b/sstable/reader_common.go @@ -62,7 +62,7 @@ var NoTransforms = IterTransforms{} // SyntheticSeqNum is used to override all sequence numbers in a table. It is // set to a non-zero value when the table was created externally and ingested // whole. -type SyntheticSeqNum uint64 +type SyntheticSeqNum base.SeqNum // NoSyntheticSeqNum is the default zero value for SyntheticSeqNum, which // disables overriding the sequence number. diff --git a/sstable/reader_virtual.go b/sstable/reader_virtual.go index 29649119c8..1671d55bcb 100644 --- a/sstable/reader_virtual.go +++ b/sstable/reader_virtual.go @@ -179,7 +179,7 @@ func (v *VirtualReader) NewRawRangeKeyIter( // transform iter into VirtualReader. transform := &rangekey.ForeignSSTTransformer{ Equal: v.reader.Equal, - SeqNum: uint64(syntheticSeqNum), + SeqNum: base.SeqNum(syntheticSeqNum), } transformIter := &keyspan.TransformerIter{ FragmentIterator: iter, diff --git a/sstable/testdata/virtual_reader_iter b/sstable/testdata/virtual_reader_iter index 4682331c3c..32769b27bc 100644 --- a/sstable/testdata/virtual_reader_iter +++ b/sstable/testdata/virtual_reader_iter @@ -198,9 +198,9 @@ h.SET.9:h point: [a#1,SET-h#9,SET] seqnums: [1-9] -virtualize lower=c.SET.3 upper=f.SET.1:ff +virtualize lower=c.SET.3 upper=f.SET.1 ---- -bounds: [c#3,SET-f#0,SET] +bounds: [c#3,SET-f#1,SET] iter set-bounds lower=d upper=e @@ -413,9 +413,9 @@ h.SET.9:h point: [a#1,SET-h#9,SET] seqnums: [1-9] -virtualize lower=c.SET.3 upper=f.SET.1:ff +virtualize lower=c.SET.3 upper=f.SET.1 ---- -bounds: [c#3,SET-f#0,SET] +bounds: [c#3,SET-f#1,SET] iter set-bounds lower=d upper=e diff --git a/sstable/writer.go b/sstable/writer.go index acf98b56b6..84b34f0be7 100644 --- a/sstable/writer.go +++ b/sstable/writer.go @@ -50,8 +50,8 @@ type WriterMetadata struct { HasPointKeys bool HasRangeDelKeys bool HasRangeKeys bool - SmallestSeqNum uint64 - LargestSeqNum uint64 + SmallestSeqNum base.SeqNum + LargestSeqNum base.SeqNum Properties Properties } @@ -103,7 +103,7 @@ func (m *WriterMetadata) SetLargestRangeKey(k InternalKey) { m.HasRangeKeys = true } -func (m *WriterMetadata) updateSeqNum(seqNum uint64) { +func (m *WriterMetadata) updateSeqNum(seqNum base.SeqNum) { if m.SmallestSeqNum > seqNum { m.SmallestSeqNum = seqNum } @@ -227,7 +227,7 @@ type Writer struct { } type pointKeyInfo struct { - trailer uint64 + trailer base.Trailer // Only computed when w.valueBlockWriter is not nil. userKeyLen int // prefixLen uses w.split, if not nil. Only computed when w.valueBlockWriter @@ -831,10 +831,10 @@ func (w *Writer) makeAddPointDecisionV3( if !w.meta.HasPointKeys { return false, false, false, nil } - keyKind := base.TrailerKind(key.Trailer) + keyKind := key.Trailer.Kind() prevPointUserKey := w.getLastPointUserKey() prevPointKey := InternalKey{UserKey: prevPointUserKey, Trailer: prevPointKeyInfo.trailer} - prevKeyKind := base.TrailerKind(prevPointKeyInfo.trailer) + prevKeyKind := prevPointKeyInfo.trailer.Kind() considerWriteToValueBlock := prevKeyKind == InternalKeyKindSet && keyKind == InternalKeyKindSet if considerWriteToValueBlock && !w.requiredInPlaceValueBound.IsEmpty() { @@ -951,7 +951,7 @@ func (w *Writer) addPoint(key InternalKey, value []byte, forceObsolete bool) err maxSharedKeyLen = w.lastPointKeyInfo.prefixLen setHasSameKeyPrefix, writeToValueBlock, isObsolete, err = w.makeAddPointDecisionV3(key, len(value)) - addPrefixToValueStoredWithKey = base.TrailerKind(key.Trailer) == InternalKeyKindSet + addPrefixToValueStoredWithKey = key.Kind() == InternalKeyKindSet } else { err = w.makeAddPointDecisionV2(key) } diff --git a/sstable/writer_test.go b/sstable/writer_test.go index b3b8a1d2f0..1d72f87e43 100644 --- a/sstable/writer_test.go +++ b/sstable/writer_test.go @@ -951,7 +951,7 @@ func TestWriterRace(t *testing.T) { for ki := 0; ki < len(keys); ki++ { require.NoError( t, - w.Add(base.MakeInternalKey(keys[ki], uint64(ki), InternalKeyKindSet), val), + w.Add(base.MakeInternalKey(keys[ki], base.SeqNum(ki), InternalKeyKindSet), val), ) require.Equal( t, w.dataBlockBuf.dataBlock.getCurKey().UserKey, keys[ki], diff --git a/table_stats.go b/table_stats.go index c5b9bb281e..b04b439e4b 100644 --- a/table_stats.go +++ b/table_stats.go @@ -377,7 +377,7 @@ func (d *DB) loadTableRangeDelStats( start, end := s.Start, s.End // We only need to consider deletion size estimates for tables that contain // RANGEDELs. - var maxRangeDeleteSeqNum uint64 + var maxRangeDeleteSeqNum base.SeqNum for _, k := range s.Keys { if k.Kind() == base.InternalKeyKindRangeDelete && maxRangeDeleteSeqNum < k.SeqNum() { maxRangeDeleteSeqNum = k.SeqNum() @@ -540,7 +540,7 @@ func (d *DB) estimateSizesBeneath( func (d *DB) estimateReclaimedSizeBeneath( v *version, level int, start, end []byte, hintType deleteCompactionHintType, -) (estimate uint64, hintSeqNum uint64, err error) { +) (estimate uint64, hintSeqNum base.SeqNum, err error) { // Find all files in lower levels that overlap with the deleted range // [start, end). // diff --git a/tool/db.go b/tool/db.go index 184ceaf859..66018f5226 100644 --- a/tool/db.go +++ b/tool/db.go @@ -819,8 +819,8 @@ func propArgs(props []props, getProp func(*props) interface{}) []interface{} { type props struct { Count uint64 - SmallestSeqNum uint64 - LargestSeqNum uint64 + SmallestSeqNum base.SeqNum + LargestSeqNum base.SeqNum DataSize uint64 FilterSize uint64 IndexSize uint64 diff --git a/tool/lsm.go b/tool/lsm.go index f1e8b1679c..a5dc528afc 100644 --- a/tool/lsm.go +++ b/tool/lsm.go @@ -27,8 +27,8 @@ type lsmFileMetadata struct { Size uint64 Smallest int // ID of smallest key Largest int // ID of largest key - SmallestSeqNum uint64 - LargestSeqNum uint64 + SmallestSeqNum base.SeqNum + LargestSeqNum base.SeqNum Virtual bool } @@ -45,7 +45,7 @@ type lsmVersionEdit struct { type lsmKey struct { Pretty string - SeqNum uint64 + SeqNum base.SeqNum Kind int } diff --git a/tool/util.go b/tool/util.go index e315424a25..6747a1bff7 100644 --- a/tool/util.go +++ b/tool/util.go @@ -254,7 +254,7 @@ func formatKey(w io.Writer, fmtKey keyFormatter, key *base.InternalKey) bool { return true } -func formatSeqNumRange(w io.Writer, start, end uint64) { +func formatSeqNumRange(w io.Writer, start, end base.SeqNum) { fmt.Fprintf(w, "<#%d-#%d>", start, end) } diff --git a/tool/wal.go b/tool/wal.go index 911634a236..b52db349f6 100644 --- a/tool/wal.go +++ b/tool/wal.go @@ -153,7 +153,7 @@ func (w *walT) runDump(cmd *cobra.Command, args []string) { case base.InternalKeyKindRangeDelete: fmt.Fprintf(stdout, "%s,%s", w.fmtKey.fn(ukey), w.fmtKey.fn(value)) case base.InternalKeyKindRangeKeySet, base.InternalKeyKindRangeKeyUnset, base.InternalKeyKindRangeKeyDelete: - ik := base.MakeInternalKey(ukey, b.SeqNum()+uint64(idx), kind) + ik := base.MakeInternalKey(ukey, b.SeqNum()+base.SeqNum(idx), kind) s, err := rangekey.Decode(ik, value, nil) if err != nil { fmt.Fprintf(stdout, "%s: error decoding %s", w.fmtKey.fn(ukey), err) diff --git a/version_set.go b/version_set.go index 5f7130ed60..c0ec1758fb 100644 --- a/version_set.go +++ b/version_set.go @@ -43,13 +43,13 @@ type versionList = manifest.VersionList // to the MANIFEST file, which is replayed at startup. type versionSet struct { // Next seqNum to use for WAL writes. - logSeqNum atomic.Uint64 + logSeqNum base.AtomicSeqNum // The upper bound on sequence numbers that have been assigned so far. A // suffix of these sequence numbers may not have been written to a WAL. Both // logSeqNum and visibleSeqNum are atomically updated by the commitPipeline. // visibleSeqNum is <= logSeqNum. - visibleSeqNum atomic.Uint64 + visibleSeqNum base.AtomicSeqNum // Number of bytes present in sstables being written by in-progress // compactions. This value will be zero if there are no in-progress diff --git a/version_set_test.go b/version_set_test.go index 211930b8c8..fda6eb47e7 100644 --- a/version_set_test.go +++ b/version_set_test.go @@ -298,7 +298,7 @@ func TestVersionSetSeqNums(t *testing.T) { require.NotNil(t, manifest) defer manifest.Close() rr := record.NewReader(manifest, 0 /* logNum */) - lastSeqNum := uint64(0) + var lastSeqNum base.SeqNum for { r, err := rr.Next() if err == io.EOF { @@ -313,7 +313,7 @@ func TestVersionSetSeqNums(t *testing.T) { } } // 2 ingestions happened, so LastSeqNum should equal base.SeqNumStart + 1. - require.Equal(t, uint64(11), lastSeqNum) + require.Equal(t, base.SeqNum(11), lastSeqNum) // logSeqNum is always one greater than the last assigned sequence number. require.Equal(t, d.mu.versions.logSeqNum.Load(), lastSeqNum+1) } diff --git a/wal/reader.go b/wal/reader.go index 745fe1698a..79456b4706 100644 --- a/wal/reader.go +++ b/wal/reader.go @@ -227,7 +227,7 @@ type virtualWALReader struct { // should monotonically increase as we iterate over the WAL files. If we // ever observe a batch encoding a sequence number <= lastSeqNum, we must // have already returned the batch and should skip it. - lastSeqNum uint64 + lastSeqNum base.SeqNum // recordBuf is a buffer used to hold the latest record read from a physical // file, and then returned to the user. A pointer to this buffer is returned // directly to the caller of NextRecord. diff --git a/wal/reader_test.go b/wal/reader_test.go index 18f8378c09..2f902e23f0 100644 --- a/wal/reader_test.go +++ b/wal/reader_test.go @@ -143,7 +143,7 @@ func TestReader(t *testing.T) { count := uint32(fields.MustKeyValue("count").Uint64()) seq = fields.MustKeyValue("seq").Uint64() rng.Read(repr[batchrepr.HeaderLen:]) - batchrepr.SetSeqNum(repr, seq) + batchrepr.SetSeqNum(repr, base.SeqNum(seq)) batchrepr.SetCount(repr, count) }