Skip to content

Commit

Permalink
db: use atomic for versionSet.nextFileNum
Browse files Browse the repository at this point in the history
Currently, we have the nextFileNum counter protected by db.mu,
which is pretty unnecessary and results in us grabbing the db
mutex just for being able to generate a new filenum. This change
moves that to an atomic.
  • Loading branch information
itsbilal committed Sep 30, 2024
1 parent 7dcd4f4 commit eb9d37a
Show file tree
Hide file tree
Showing 5 changed files with 18 additions and 24 deletions.
2 changes: 0 additions & 2 deletions compaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -2667,9 +2667,7 @@ func (c *compaction) makeVersionEdit(result compact.Result) (*versionEdit, error
func (d *DB) newCompactionOutput(
jobID JobID, c *compaction, writerOpts sstable.WriterOptions,
) (objstorage.ObjectMetadata, sstable.RawWriter, CPUWorkHandle, error) {
d.mu.Lock()
diskFileNum := d.mu.versions.getNextDiskFileNum()
d.mu.Unlock()

var writeCategory vfs.DiskWriteCategory
if d.opts.EnableSQLRowSpillMetrics {
Expand Down
4 changes: 1 addition & 3 deletions flushable_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,13 +49,11 @@ func TestIngestedSSTFlushableAPI(t *testing.T) {
reset()

loadFileMeta := func(paths []string, exciseSpan KeyRange, seqNum base.SeqNum) []*fileMetadata {
d.mu.Lock()
pendingOutputs := make([]base.FileNum, len(paths))
for i := range paths {
pendingOutputs[i] = d.mu.versions.getNextFileNum()
}
jobID := d.newJobIDLocked()
d.mu.Unlock()
jobID := d.newJobID()

// We can reuse the ingestLoad function for this test even if we're
// not actually ingesting a file.
Expand Down
4 changes: 1 addition & 3 deletions ingest.go
Original file line number Diff line number Diff line change
Expand Up @@ -1355,14 +1355,12 @@ func (d *DB) ingest(
// the file number ordering to be out of alignment with sequence number
// ordering. The sorting of L0 tables by sequence number avoids relying on
// that (busted) invariant.
d.mu.Lock()
pendingOutputs := make([]base.FileNum, len(paths)+len(shared)+len(external))
for i := 0; i < len(paths)+len(shared)+len(external); i++ {
pendingOutputs[i] = d.mu.versions.getNextFileNum()
}

jobID := d.newJobIDLocked()
d.mu.Unlock()
jobID := d.newJobID()

// Load the metadata for all the files being ingested. This step detects
// and elides empty sstables.
Expand Down
4 changes: 2 additions & 2 deletions table_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,9 +283,9 @@ func TestVirtualReadsWiring(t *testing.T) {
l6 := currVersion.Levels[6]
l6FileIter := l6.Iter()
parentFile := l6FileIter.First()
f1 := FileNum(d.mu.versions.nextFileNum)
f1 := FileNum(d.mu.versions.nextFileNum.Load())
f2 := f1 + 1
d.mu.versions.nextFileNum += 2
d.mu.versions.nextFileNum.Add(2)

seqNumA := parentFile.Smallest.SeqNum()
// See SeqNum comments above.
Expand Down
28 changes: 14 additions & 14 deletions version_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ type versionSet struct {

// The next file number. A single counter is used to assign file
// numbers for the WAL, MANIFEST, sstable, and OPTIONS files.
nextFileNum uint64
nextFileNum atomic.Uint64

// The current manifest file number.
manifestFileNum base.DiskFileNum
Expand Down Expand Up @@ -154,7 +154,7 @@ func (vs *versionSet) init(
vs.obsoleteFn = vs.addObsoleteLocked
vs.zombieTables = make(map[base.DiskFileNum]tableInfo)
vs.virtualBackings = manifest.MakeVirtualBackings()
vs.nextFileNum = 1
vs.nextFileNum.Store(1)
vs.manifestMarker = marker
vs.getFormatMajorVersion = getFMV
}
Expand All @@ -181,7 +181,7 @@ func (vs *versionSet) create(
// Note that a "snapshot" version edit is written to the manifest when it is
// created.
vs.manifestFileNum = vs.getNextDiskFileNum()
err = vs.createManifest(vs.dirname, vs.manifestFileNum, vs.minUnflushedLogNum, vs.nextFileNum, nil /* virtualBackings */)
err = vs.createManifest(vs.dirname, vs.manifestFileNum, vs.minUnflushedLogNum, vs.nextFileNum.Load(), nil /* virtualBackings */)
if err == nil {
if err = vs.manifest.Flush(); err != nil {
vs.opts.Logger.Fatalf("MANIFEST flush failed: %v", err)
Expand Down Expand Up @@ -270,7 +270,7 @@ func (vs *versionSet) load(
vs.minUnflushedLogNum = ve.MinUnflushedLogNum
}
if ve.NextFileNum != 0 {
vs.nextFileNum = ve.NextFileNum
vs.nextFileNum.Store(ve.NextFileNum)
}
if ve.LastSeqNum != 0 {
// logSeqNum is the _next_ sequence number that will be assigned,
Expand All @@ -294,7 +294,7 @@ func (vs *versionSet) load(
// function and could have only updated it to some other non-zero value,
// so it cannot be 0 here.
if vs.minUnflushedLogNum == 0 {
if vs.nextFileNum >= 2 {
if vs.nextFileNum.Load() >= 2 {
// We either have a freshly created DB, or a DB created by RocksDB
// that has not had a single flushed SSTable yet. This is because
// RocksDB bumps up nextFileNum in this case without bumping up
Expand Down Expand Up @@ -441,7 +441,7 @@ func (vs *versionSet) logAndApply(

if ve.MinUnflushedLogNum != 0 {
if ve.MinUnflushedLogNum < vs.minUnflushedLogNum ||
vs.nextFileNum <= uint64(ve.MinUnflushedLogNum) {
vs.nextFileNum.Load() <= uint64(ve.MinUnflushedLogNum) {
panic(fmt.Sprintf("pebble: inconsistent versionEdit minUnflushedLogNum %d",
ve.MinUnflushedLogNum))
}
Expand All @@ -452,7 +452,7 @@ func (vs *versionSet) logAndApply(
// current filenum and not the next one.
//
// TODO(sbhola): figure out why this is correct and update comment.
ve.NextFileNum = vs.nextFileNum
ve.NextFileNum = vs.nextFileNum.Load()

// LastSeqNum is set to the current upper bound on the assigned sequence
// numbers. Note that this is exactly the behavior of RocksDB. LastSeqNum is
Expand Down Expand Up @@ -542,7 +542,7 @@ func (vs *versionSet) logAndApply(
// Grab certain values before releasing vs.mu, in case createManifest() needs
// to be called.
minUnflushedLogNum := vs.minUnflushedLogNum
nextFileNum := vs.nextFileNum
nextFileNum := vs.nextFileNum.Load()

// Note: this call populates ve.RemovedBackingTables.
zombieBackings, removedVirtualBackings, localLiveSizeDelta :=
Expand Down Expand Up @@ -949,21 +949,21 @@ func (vs *versionSet) createManifest(
return nil
}

// NB: This method is not safe for concurrent use. It is only safe
// to be called when concurrent changes to nextFileNum are not expected.
func (vs *versionSet) markFileNumUsed(fileNum base.DiskFileNum) {
if vs.nextFileNum <= uint64(fileNum) {
vs.nextFileNum = uint64(fileNum + 1)
if vs.nextFileNum.Load() <= uint64(fileNum) {
vs.nextFileNum.Store(uint64(fileNum + 1))
}
}

func (vs *versionSet) getNextFileNum() base.FileNum {
x := vs.nextFileNum
vs.nextFileNum++
x := vs.nextFileNum.Add(1) - 1
return base.FileNum(x)
}

func (vs *versionSet) getNextDiskFileNum() base.DiskFileNum {
x := vs.nextFileNum
vs.nextFileNum++
x := vs.nextFileNum.Add(1) - 1
return base.DiskFileNum(x)
}

Expand Down

0 comments on commit eb9d37a

Please sign in to comment.