Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

leveldb: introduce trivial version finalization #264

Merged
merged 1 commit into from
Feb 26, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions leveldb/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -468,7 +468,7 @@ func recoverTable(s *session, o *opt.Options) error {
}

// Commit.
return s.commit(rec)
return s.commit(rec, false)
}

func (db *DB) recoverJournal() error {
Expand Down Expand Up @@ -538,7 +538,7 @@ func (db *DB) recoverJournal() error {

rec.setJournalNum(fd.Num)
rec.setSeqNum(db.seq)
if err := db.s.commit(rec); err != nil {
if err := db.s.commit(rec, false); err != nil {
fr.Close()
return err
}
Expand Down Expand Up @@ -617,7 +617,7 @@ func (db *DB) recoverJournal() error {
// Commit.
rec.setJournalNum(db.journalFd.Num)
rec.setSeqNum(db.seq)
if err := db.s.commit(rec); err != nil {
if err := db.s.commit(rec, false); err != nil {
// Close journal on error.
if db.journal != nil {
db.journal.Close()
Expand Down
2 changes: 1 addition & 1 deletion leveldb/db_compaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ func (db *DB) compactionCommit(name string, rec *sessionRecord) {
db.compCommitLk.Lock()
defer db.compCommitLk.Unlock() // Defer is necessary.
db.compactionTransactFunc(name+"@commit", func(cnt *compactionTransactCounter) error {
return db.s.commit(rec)
return db.s.commit(rec, true)
}, nil)
}

Expand Down
10 changes: 5 additions & 5 deletions leveldb/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -515,7 +515,7 @@ func truno(t *testing.T, o *opt.Options, f func(h *dbHarness)) {
if o == nil {
o = &opt.Options{
DisableLargeBatchTransaction: true,
Filter: testingBloomFilter,
Filter: testingBloomFilter,
}
} else {
old := o
Expand Down Expand Up @@ -2558,7 +2558,7 @@ func TestDB_TableCompactionBuilder(t *testing.T) {
}
rec := &sessionRecord{}
rec.addTableFile(i, tf)
if err := s.commit(rec); err != nil {
if err := s.commit(rec, false); err != nil {
t.Fatal(err)
}
}
Expand All @@ -2582,7 +2582,7 @@ func TestDB_TableCompactionBuilder(t *testing.T) {
for _, t := range c.levels[0] {
rec.delTable(c.sourceLevel, t.fd.Num)
}
if err := s.commit(rec); err != nil {
if err := s.commit(rec, false); err != nil {
t.Fatal(err)
}
c.release()
Expand Down Expand Up @@ -2611,7 +2611,7 @@ func TestDB_TableCompactionBuilder(t *testing.T) {
rec.delTable(2, t.fd.Num)
rec.addTableFile(3, t)
}
if err := s.commit(rec); err != nil {
if err := s.commit(rec, false); err != nil {
t.Fatal(err)
}
c.release()
Expand Down Expand Up @@ -2653,7 +2653,7 @@ func TestDB_TableCompactionBuilder(t *testing.T) {
break
}
}
if err := s.commit(rec); err != nil {
if err := s.commit(rec, false); err != nil {
t.Fatal(err)
}
c.release()
Expand Down
2 changes: 1 addition & 1 deletion leveldb/db_transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ func (tr *Transaction) Commit() error {
tr.stats.startTimer()
var cerr error
for retry := 0; retry < 3; retry++ {
cerr = tr.db.s.commit(&tr.rec)
cerr = tr.db.s.commit(&tr.rec, false)
if cerr != nil {
tr.db.logf("transaction@commit error R·%d %q", retry, cerr)
select {
Expand Down
6 changes: 3 additions & 3 deletions leveldb/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,19 +180,19 @@ func (s *session) recover() (err error) {
}

s.manifestFd = fd
s.setVersion(staging.finish())
s.setVersion(staging.finish(false))
s.setNextFileNum(rec.nextFileNum)
s.recordCommited(rec)
return nil
}

// Commit session; need external synchronization.
func (s *session) commit(r *sessionRecord) (err error) {
func (s *session) commit(r *sessionRecord, trivial bool) (err error) {
v := s.version()
defer v.release()

// spawn new version based on current version
nv := v.spawn(r)
nv := v.spawn(r, trivial)

if s.manifest == nil {
// manifest journal writer not yet created, create one
Expand Down
8 changes: 8 additions & 0 deletions leveldb/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,14 @@ func (tf tFiles) searchMax(icmp *iComparer, ikey internalKey) int {
})
}

// Searches smallest index of tables whose its file number
// is smaller than the given number.
func (tf tFiles) searchNumLess(num int64) int {
return sort.Search(len(tf), func(i int) bool {
return tf[i].fd.Num < num
})
}

// Returns true if given key range overlaps with one or more
// tables key range. If unsorted is true then binary search will not be used.
func (tf tFiles) overlaps(icmp *iComparer, umin, umax []byte, unsorted bool) bool {
Expand Down
41 changes: 38 additions & 3 deletions leveldb/version.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,10 +273,10 @@ func (v *version) newStaging() *versionStaging {
}

// Spawn a new version based on this version.
func (v *version) spawn(r *sessionRecord) *version {
func (v *version) spawn(r *sessionRecord, trivial bool) *version {
staging := v.newStaging()
staging.commit(r)
return staging.finish()
return staging.finish(trivial)
}

func (v *version) fillRecord(r *sessionRecord) {
Expand Down Expand Up @@ -446,7 +446,7 @@ func (p *versionStaging) commit(r *sessionRecord) {
}
}

func (p *versionStaging) finish() *version {
func (p *versionStaging) finish(trivial bool) *version {
// Build new version.
nv := newVersion(p.base.s)
numLevel := len(p.levels)
Expand All @@ -463,6 +463,12 @@ func (p *versionStaging) finish() *version {
if level < len(p.levels) {
scratch := p.levels[level]

// Short circuit if there is no change at all.
if len(scratch.added) == 0 && len(scratch.deleted) == 0 {
nv.levels[level] = baseTabels
continue
}

var nt tFiles
// Prealloc list if possible.
if n := len(baseTabels) + len(scratch.added) - len(scratch.deleted); n > 0 {
Expand All @@ -480,6 +486,35 @@ func (p *versionStaging) finish() *version {
nt = append(nt, t)
}

// For normal table compaction, one compaction will only involve two levels
// of files. And the new files generated after merging the source level and
// source+1 level related files can be inserted as a whole into source+1 level
// without any overlap with the other source+1 files.
//
// When the amount of data maintained by leveldb is large, the number of files
// per level will be very large. While qsort is very inefficient for sorting
// already ordered arrays. Therefore, for the normal table compaction, we use
// binary search here to find the insert index to insert a batch of new added
// files directly instead of using qsort.
if trivial && len(scratch.added) > 0 {
added := make(tFiles, 0, len(scratch.added))
for _, r := range scratch.added {
added = append(added, tableFileFromRecord(r))
}
if level == 0 {
added.sortByNum()
index := nt.searchNumLess(added[len(added)-1].fd.Num)
nt = append(nt[:index], append(added, nt[index:]...)...)
} else {
added.sortByKey(p.base.s.icmp)
_, amax := added.getRange(p.base.s.icmp)
index := nt.searchMin(p.base.s.icmp, amax)
nt = append(nt[:index], append(added, nt[index:]...)...)
}
nv.levels[level] = nt
continue
}

// New tables.
for _, r := range scratch.added {
nt = append(nt, tableFileFromRecord(r))
Expand Down
99 changes: 97 additions & 2 deletions leveldb/version_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,12 @@ package leveldb

import (
"encoding/binary"
"math/rand"
"reflect"
"testing"

"github.com/onsi/gomega"

"github.com/syndtr/goleveldb/leveldb/storage"
"github.com/syndtr/goleveldb/leveldb/testutil"
)

Expand Down Expand Up @@ -35,6 +36,7 @@ func TestVersionStaging(t *testing.T) {

for i, x := range []struct {
add, del []testFileRec
trivial bool
levels [][]int64
}{
{
Expand Down Expand Up @@ -149,6 +151,46 @@ func TestVersionStaging(t *testing.T) {
{2},
},
},
// memory compaction
{
add: []testFileRec{
{0, 5},
},
trivial: true,
levels: [][]int64{
{5, 3, 1},
{2},
},
},
// memory compaction
{
add: []testFileRec{
{0, 4},
},
trivial: true,
levels: [][]int64{
{5, 4, 3, 1},
{2},
},
},
// table compaction
{
add: []testFileRec{
{1, 6},
{1, 7},
{1, 8},
},
del: []testFileRec{
{0, 3},
{0, 4},
{0, 5},
},
trivial: true,
levels: [][]int64{
{1},
{2, 6, 7, 8},
},
},
} {
rec := &sessionRecord{}
for _, f := range x.add {
Expand All @@ -160,7 +202,7 @@ func TestVersionStaging(t *testing.T) {
}
vs := v.newStaging()
vs.commit(rec)
v = vs.finish()
v = vs.finish(x.trivial)
if len(v.levels) != len(x.levels) {
t.Fatalf("#%d: invalid level count: want=%d got=%d", i, len(x.levels), len(v.levels))
}
Expand All @@ -179,3 +221,56 @@ func TestVersionStaging(t *testing.T) {
}
}
}

func BenchmarkVersionStagingNonTrivial(b *testing.B) {
benchmarkVersionStaging(b, false, 100000)
}

func BenchmarkVersionStagingTrivial(b *testing.B) {
benchmarkVersionStaging(b, true, 100000)
}

func benchmarkVersionStaging(b *testing.B, trivial bool, size int) {
stor := storage.NewMemStorage()
defer stor.Close()
s, err := newSession(stor, nil)
if err != nil {
b.Fatal(err)
}

v := newVersion(s)
v.newStaging()

tmp := make([]byte, 4)
mik := func(i uint64) []byte {
binary.BigEndian.PutUint32(tmp, uint32(i))
return []byte(makeInternalKey(nil, tmp, 0, keyTypeVal))
}

rec := &sessionRecord{}
for i := 0; i < size; i++ {
ik := mik(uint64(i))
rec.addTable(1, int64(i), 1, ik, ik)
}
vs := v.newStaging()
vs.commit(rec)
v = vs.finish(false)

b.ResetTimer()
b.ReportAllocs()

for i := 0; i < b.N; i++ {
rec := &sessionRecord{}
index := rand.Intn(size)
ik := mik(uint64(index))

cnt := 0
for j := index; j < size && cnt <= 3; j++ {
rec.addTable(1, int64(i), 1, ik, ik)
cnt += 1
}
vs := v.newStaging()
vs.commit(rec)
vs.finish(trivial)
}
}