From adeb84282b7dda7f7ad595ff6061aa9a330a2ef1 Mon Sep 17 00:00:00 2001 From: Balaji Jinnah Date: Mon, 15 Jun 2020 13:49:30 +0530 Subject: [PATCH 1/4] Fix assert in background compression and encryption. (#1366) * use assert to find whether, the async compression/encryption able to fit the destination block in the allocated space. Allocated space is calculated using (item.end-item.start) + padding +1 (cherry picked from commit c45d966681d4a058df1190b10f1b5f60c9ab0b6d) --- table/builder.go | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/table/builder.go b/table/builder.go index d32d9c6aa..adf064d9a 100644 --- a/table/builder.go +++ b/table/builder.go @@ -149,13 +149,13 @@ func (b *Builder) handleBlock() { blockBuf = eBlock } - // The newend should always be less than or equal to the original end - // plus the padding. If the new end is greater than item.end+padding - // that means the data from this block cannot be stored in its existing - // location and trying to copy it over would mean we would over-write - // some data of the next block. - y.AssertTruef(uint32(len(blockBuf)) <= item.end+padding, - "newend: %d item.end: %d padding: %d", len(blockBuf), item.end, padding) + // BlockBuf should always less than or equal to allocated space. If the blockBuf is greater + // than allocated space that means the data from this block cannot be stored in its + // existing location and trying to copy it over would mean we would over-write some data + // of the next block. + allocatedSpace := (item.end - item.start) + padding + 1 + y.AssertTruef(uint32(len(blockBuf)) <= allocatedSpace, "newend: %d oldend: %d padding: %d", + item.start+uint32(len(blockBuf)), item.end, padding) // Acquire the buflock here. The builder.grow function might change // the b.buf while this goroutine was running. From e5fd05ad464cda4851dc9f96736bb43ace08584b Mon Sep 17 00:00:00 2001 From: Damien Tournoud Date: Sat, 27 Jun 2020 00:02:47 -0700 Subject: [PATCH 2/4] Rework DB.DropPrefix (#1381) Fixes three issues with the current implementation: - It can generate compaction requests that break the invariant that bottom tables need to be consecutive (issue #1380). See https://github.com/dgraph-io/badger/issues/1380#issuecomment-649448069 - It performs the same level compactions in increasing order of levels (starting from L0) which leads to old versions of keys for the prefix re-surfacing to active transactions. - When you have to drop multiple prefixes, the API forces you to drop one prefix at a time and go through the whole expensive table rewriting multiple times. Fixes #1381 Co-authored-by: Ibrahim Jarif (cherry picked from commit e013bfd25899ec014d7117fa6e94b17a114d9921) --- db.go | 16 +++--- levels.go | 141 ++++++++++++++++++++++++++++++++----------------- levels_test.go | 40 ++++++++++++++ util.go | 4 +- 4 files changed, 142 insertions(+), 59 deletions(-) diff --git a/db.go b/db.go index 79b332d23..9fca7a1fc 100644 --- a/db.go +++ b/db.go @@ -940,7 +940,7 @@ func buildL0Table(ft flushTask, bopts table.Options) []byte { defer b.Close() var vp valuePointer for iter.SeekToFirst(); iter.Valid(); iter.Next() { - if len(ft.dropPrefix) > 0 && bytes.HasPrefix(iter.Key(), ft.dropPrefix) { + if len(ft.dropPrefixes) > 0 && hasAnyPrefixes(iter.Key(), ft.dropPrefixes) { continue } vs := iter.Value() @@ -953,9 +953,9 @@ func buildL0Table(ft flushTask, bopts table.Options) []byte { } type flushTask struct { - mt *skl.Skiplist - vptr valuePointer - dropPrefix []byte + mt *skl.Skiplist + vptr valuePointer + dropPrefixes [][]byte } // handleFlushTask must be run serially. @@ -1584,7 +1584,7 @@ func (db *DB) dropAll() (func(), error) { // - Compact L0->L1, skipping over Kp. // - Compact rest of the levels, Li->Li, picking tables which have Kp. // - Resume memtable flushes, compactions and writes. -func (db *DB) DropPrefix(prefix []byte) error { +func (db *DB) DropPrefix(prefixes ...[]byte) error { db.opt.Infof("DropPrefix Called") f, err := db.prepareToDrop() if err != nil { @@ -1604,8 +1604,8 @@ func (db *DB) DropPrefix(prefix []byte) error { task := flushTask{ mt: memtable, // Ensure that the head of value log gets persisted to disk. - vptr: db.vhead, - dropPrefix: prefix, + vptr: db.vhead, + dropPrefixes: prefixes, } db.opt.Debugf("Flushing memtable") if err := db.handleFlushTask(task); err != nil { @@ -1620,7 +1620,7 @@ func (db *DB) DropPrefix(prefix []byte) error { db.mt = skl.NewSkiplist(arenaSize(db.opt)) // Drop prefixes from the levels. - if err := db.lc.dropPrefix(prefix); err != nil { + if err := db.lc.dropPrefixes(prefixes); err != nil { return err } db.opt.Infof("DropPrefix done") diff --git a/levels.go b/levels.go index 764fcd944..e52072f76 100644 --- a/levels.go +++ b/levels.go @@ -274,9 +274,25 @@ func (s *levelsController) dropTree() (int, error) { // tables who only have keys with this prefix are quickly dropped. The ones which have other keys // are run through MergeIterator and compacted to create new tables. All the mechanisms of // compactions apply, i.e. level sizes and MANIFEST are updated as in the normal flow. -func (s *levelsController) dropPrefix(prefix []byte) error { +func (s *levelsController) dropPrefixes(prefixes [][]byte) error { + // Internal move keys related to the given prefix should also be skipped. + for _, prefix := range prefixes { + key := make([]byte, 0, len(badgerMove)+len(prefix)) + key = append(key, badgerMove...) + key = append(key, prefix...) + prefixes = append(prefixes, key) + } + opt := s.kv.opt - for _, l := range s.levels { + // Iterate levels in the reverse order because if we were to iterate from + // lower level (say level 0) to a higher level (say level 3) we could have + // a state in which level 0 is compacted and an older version of a key exists in lower level. + // At this point, if someone creates an iterator, they would see an old + // value for a key from lower levels. Iterating in reverse order ensures we + // drop the oldest data first so that lookups never return stale data. + for i := len(s.levels) - 1; i >= 0; i-- { + l := s.levels[i] + l.RLock() if l.level == 0 { size := len(l.tables) @@ -288,7 +304,7 @@ func (s *levelsController) dropPrefix(prefix []byte) error { score: 1.74, // A unique number greater than 1.0 does two things. Helps identify this // function in logs, and forces a compaction. - dropPrefix: prefix, + dropPrefixes: prefixes, } if err := s.doCompact(cp); err != nil { opt.Warningf("While compacting level 0: %v", err) @@ -298,39 +314,49 @@ func (s *levelsController) dropPrefix(prefix []byte) error { continue } - var tables []*table.Table - // Internal move keys related to the given prefix should also be skipped. - moveKeyForPrefix := append(badgerMove, prefix...) - prefixesToSkip := [][]byte{prefix, moveKeyForPrefix} - for _, table := range l.tables { - var absent bool - switch { - case hasAnyPrefixes(table.Smallest(), prefixesToSkip): - case hasAnyPrefixes(table.Biggest(), prefixesToSkip): - case containsAnyPrefixes(table.Smallest(), table.Biggest(), prefixesToSkip): - default: - absent = true + // Build a list of compaction tableGroups affecting all the prefixes we + // need to drop. We need to build tableGroups that satisfy the invariant that + // bottom tables are consecutive. + // tableGroup contains groups of consecutive tables. + var tableGroups [][]*table.Table + var tableGroup []*table.Table + + finishGroup := func() { + if len(tableGroup) > 0 { + tableGroups = append(tableGroups, tableGroup) + tableGroup = nil } - if !absent { - tables = append(tables, table) + } + + for _, table := range l.tables { + if containsAnyPrefixes(table.Smallest(), table.Biggest(), prefixes) { + tableGroup = append(tableGroup, table) + } else { + finishGroup() } } + finishGroup() + l.RUnlock() - if len(tables) == 0 { + + if len(tableGroups) == 0 { continue } - cd := compactDef{ - elog: trace.New(fmt.Sprintf("Badger.L%d", l.level), "Compact"), - thisLevel: l, - nextLevel: l, - top: []*table.Table{}, - bot: tables, - dropPrefix: prefix, - } - if err := s.runCompactDef(l.level, cd); err != nil { - opt.Warningf("While running compact def: %+v. Error: %v", cd, err) - return err + opt.Infof("Dropping prefix at level %d (%d tableGroups)", l.level, len(tableGroups)) + for _, operation := range tableGroups { + cd := compactDef{ + elog: trace.New(fmt.Sprintf("Badger.L%d", l.level), "Compact"), + thisLevel: l, + nextLevel: l, + top: nil, + bot: operation, + dropPrefixes: prefixes, + } + if err := s.runCompactDef(l.level, cd); err != nil { + opt.Warningf("While running compact def: %+v. Error: %v", cd, err) + return err + } } } return nil @@ -395,9 +421,9 @@ func (l *levelHandler) isCompactable(delSize int64) bool { } type compactionPriority struct { - level int - score float64 - dropPrefix []byte + level int + score float64 + dropPrefixes [][]byte } // pickCompactLevel determines which level to compact. @@ -491,13 +517,19 @@ func (s *levelsController) compactBuildTables( // Next level has level>=1 and we can use ConcatIterator as key ranges do not overlap. var valid []*table.Table + +nextTable: for _, table := range botTables { - if len(cd.dropPrefix) > 0 && - bytes.HasPrefix(table.Smallest(), cd.dropPrefix) && - bytes.HasPrefix(table.Biggest(), cd.dropPrefix) { - // All the keys in this table have the dropPrefix. So, this table does not need to be - // in the iterator and can be dropped immediately. - continue + if len(cd.dropPrefixes) > 0 { + for _, prefix := range cd.dropPrefixes { + if bytes.HasPrefix(table.Smallest(), prefix) && + bytes.HasPrefix(table.Biggest(), prefix) { + // All the keys in this table have the dropPrefix. So, this + // table does not need to be in the iterator and can be + // dropped immediately. + continue nextTable + } + } } valid = append(valid, table) } @@ -535,12 +567,9 @@ func (s *levelsController) compactBuildTables( bopts.BfCache = s.kv.bfCache builder := table.NewTableBuilder(bopts) var numKeys, numSkips uint64 - // Internal move keys related to the given prefix should also be skipped. - moveKeyForPrefix := append(badgerMove, cd.dropPrefix...) - prefixesToSkip := [][]byte{cd.dropPrefix, moveKeyForPrefix} for ; it.Valid(); it.Next() { // See if we need to skip the prefix. - if len(cd.dropPrefix) > 0 && hasAnyPrefixes(it.Key(), prefixesToSkip) { + if len(cd.dropPrefixes) > 0 && hasAnyPrefixes(it.Key(), cd.dropPrefixes) { numSkips++ updateStats(it.Value()) continue @@ -715,10 +744,24 @@ func hasAnyPrefixes(s []byte, listOfPrefixes [][]byte) bool { return false } +func containsPrefix(smallValue, largeValue, prefix []byte) bool { + if bytes.HasPrefix(smallValue, prefix) { + return true + } + if bytes.HasPrefix(largeValue, prefix) { + return true + } + if bytes.Compare(prefix, smallValue) > 0 && + bytes.Compare(prefix, largeValue) < 0 { + return true + } + + return false +} + func containsAnyPrefixes(smallValue, largeValue []byte, listOfPrefixes [][]byte) bool { for _, prefix := range listOfPrefixes { - if bytes.Compare(prefix, smallValue) > 0 && - bytes.Compare(prefix, largeValue) < 0 { + if containsPrefix(smallValue, largeValue, prefix) { return true } } @@ -740,7 +783,7 @@ type compactDef struct { thisSize int64 - dropPrefix []byte + dropPrefixes [][]byte } func (cd *compactDef) lockLevels() { @@ -902,10 +945,10 @@ func (s *levelsController) doCompact(p compactionPriority) error { y.AssertTrue(l+1 < s.kv.opt.MaxLevels) // Sanity check. cd := compactDef{ - elog: trace.New(fmt.Sprintf("Badger.L%d", l), "Compact"), - thisLevel: s.levels[l], - nextLevel: s.levels[l+1], - dropPrefix: p.dropPrefix, + elog: trace.New(fmt.Sprintf("Badger.L%d", l), "Compact"), + thisLevel: s.levels[l], + nextLevel: s.levels[l+1], + dropPrefixes: p.dropPrefixes, } cd.elog.SetMaxEvents(100) defer cd.elog.Finish() diff --git a/levels_test.go b/levels_test.go index 8c7df15bb..853572ded 100644 --- a/levels_test.go +++ b/levels_test.go @@ -564,3 +564,43 @@ func TestL0Stall(t *testing.T) { test(t, &opt) }) } + +// Regression test for https://github.com/dgraph-io/dgraph/issues/5573 +func TestDropPrefixMoveBug(t *testing.T) { + runBadgerTest(t, nil, func(t *testing.T, db *DB) { + // l1 is used to verify that drop prefix actually drops move keys from all the levels. + l1 := []keyValVersion{{string(append(badgerMove, "F"...)), "", 0, 0}} + createAndOpen(db, l1, 1) + + // Mutiple levels can have the exact same move key with version. + l2 := []keyValVersion{{string(append(badgerMove, "F"...)), "", 0, 0}, {"A", "", 0, 0}} + l21 := []keyValVersion{{"B", "", 0, 0}, {"C", "", 0, 0}} + l22 := []keyValVersion{{"F", "", 0, 0}, {"G", "", 0, 0}} + + // Level 2 has all the tables. + createAndOpen(db, l2, 2) + createAndOpen(db, l21, 2) + createAndOpen(db, l22, 2) + + require.NoError(t, db.lc.validate()) + require.NoError(t, db.DropPrefix([]byte("F"))) + + db.View(func(txn *Txn) error { + iopt := DefaultIteratorOptions + iopt.AllVersions = true + + it := txn.NewIterator(iopt) + defer it.Close() + + specialKey := []byte("F") + droppedPrefixes := [][]byte{specialKey, append(badgerMove, specialKey...)} + for it.Rewind(); it.Valid(); it.Next() { + key := it.Item().Key() + // Ensure we don't have any "F" or "!badger!move!F" left + require.False(t, hasAnyPrefixes(key, droppedPrefixes)) + } + return nil + }) + require.NoError(t, db.lc.validate()) + }) +} diff --git a/util.go b/util.go index b7f173dd3..ccf7939f3 100644 --- a/util.go +++ b/util.go @@ -60,8 +60,8 @@ func (s *levelHandler) validate() error { if y.CompareKeys(s.tables[j].Smallest(), s.tables[j].Biggest()) > 0 { return errors.Errorf( - "Intra: %q vs %q: level=%d j=%d numTables=%d", - s.tables[j].Smallest(), s.tables[j].Biggest(), s.level, j, numTables) + "Intra: \n%s\n vs \n%s\n: level=%d j=%d numTables=%d", + hex.Dump(s.tables[j].Smallest()), hex.Dump(s.tables[j].Biggest()), s.level, j, numTables) } } return nil From fac972feb849093aa94109a57c978f3fa501b017 Mon Sep 17 00:00:00 2001 From: Ibrahim Jarif Date: Tue, 30 Jun 2020 21:00:38 +0530 Subject: [PATCH 3/4] Update head while replaying value log (#1372) Fixes https://github.com/dgraph-io/badger/issues/1363 The head pointer is not updated when we perform replays. The head pointer would be updated only when the replay completes. If badger crashes between the point when replay started and replay finished, we would end up replaying all the value log files. This PR fixes this issue. (cherry picked from commit 509de73a8ad1c1b8ea62ffc0cef944716435ec25) --- db.go | 10 ++++++++-- value_test.go | 13 ++++++++----- 2 files changed, 16 insertions(+), 7 deletions(-) diff --git a/db.go b/db.go index 9fca7a1fc..fc908dbf5 100644 --- a/db.go +++ b/db.go @@ -136,6 +136,10 @@ func (db *DB) replayFunction() func(Entry, valuePointer) error { } else { nv = vp.Encode() meta = meta | bitValuePointer + // Update vhead. If the crash happens while replay was in progess + // and the head is not updated, we will end up replaying all the + // files again. + db.updateHead([]valuePointer{vp}) } v := y.ValueStruct{ @@ -638,6 +642,8 @@ func (db *DB) get(key []byte) (y.ValueStruct, error) { return db.lc.get(key, maxVs, 0) } +// updateHead should not be called without the db.Lock() since db.vhead is used +// by the writer go routines and memtable flushing goroutine. func (db *DB) updateHead(ptrs []valuePointer) { var ptr valuePointer for i := len(ptrs) - 1; i >= 0; i-- { @@ -651,8 +657,6 @@ func (db *DB) updateHead(ptrs []valuePointer) { return } - db.Lock() - defer db.Unlock() y.AssertTrue(!ptr.Less(db.vhead)) db.vhead = ptr } @@ -751,7 +755,9 @@ func (db *DB) writeRequests(reqs []*request) error { done(err) return errors.Wrap(err, "writeRequests") } + db.Lock() db.updateHead(b.Ptrs) + db.Unlock() } done(nil) db.opt.Debugf("%d entries written", count) diff --git a/value_test.go b/value_test.go index 8c3c67a8a..de537837f 100644 --- a/value_test.go +++ b/value_test.go @@ -370,7 +370,6 @@ func TestValueGC4(t *testing.T) { kv, err := Open(opt) require.NoError(t, err) - defer kv.Close() sz := 128 << 10 // 5 entries per value log file. txn := kv.NewTransaction(true) @@ -409,11 +408,9 @@ func TestValueGC4(t *testing.T) { kv.vlog.rewrite(lf0, tr) kv.vlog.rewrite(lf1, tr) - err = kv.vlog.Close() - require.NoError(t, err) + require.NoError(t, kv.Close()) - kv.vlog.init(kv) - err = kv.vlog.open(kv, valuePointer{Fid: 2}, kv.replayFunction()) + kv, err = Open(opt) require.NoError(t, err) for i := 0; i < 8; i++ { @@ -435,6 +432,7 @@ func TestValueGC4(t *testing.T) { return nil })) } + require.NoError(t, kv.Close()) } func TestPersistLFDiscardStats(t *testing.T) { @@ -644,6 +642,11 @@ func TestPartialAppendToValueLog(t *testing.T) { // Replay value log from beginning, badger head is past k2. require.NoError(t, kv.vlog.Close()) + // clean up the current db.vhead so that we can replay from the beginning. + // If we don't clear the current vhead, badger will error out since new + // head passed while opening vlog is zero in the following lines. + kv.vhead = valuePointer{} + kv.vlog.init(kv) require.NoError( t, kv.vlog.open(kv, valuePointer{Fid: 0}, kv.replayFunction()), From 25fd0ef636d25b8505cd9e6e29d8f87b7b5432af Mon Sep 17 00:00:00 2001 From: Ibrahim Jarif Date: Tue, 30 Jun 2020 22:04:23 +0530 Subject: [PATCH 4/4] Update ristretto to commit f66de99 (#1391) The update brings following changes from ristretto into badger ``` f66de99 Improve handling of updated items (https://github.com/dgraph-io/ristretto/pull/168) aec7994 Fix flaky TestCacheDel test (https://github.com/dgraph-io/ristretto/pull/169) a093fe6 Add benchmark plots to the project repo (https://github.com/dgraph-io/ristretto/pull/166) 49dc42c Add Anurag as codeowner (https://github.com/dgraph-io/ristretto/pull/158) 7a3f2d3 z: use MemHashString and xxhash.Sum64String (https://github.com/dgraph-io/ristretto/pull/153) 9c31bb2 Check conflict key before updating expiration map. (https://github.com/dgraph-io/ristretto/pull/154) 62cb731 Fix race condition in Cache.Clear (https://github.com/dgraph-io/ristretto/pull/133) 9af1934 Docs and whitespace changes for readability. (https://github.com/dgraph-io/ristretto/pull/143) dbc185e Add changelog. (https://github.com/dgraph-io/ristretto/pull/142) ff325ad Remove key from policy after TTL eviction (https://github.com/dgraph-io/ristretto/pull/130) 2dd5ff5 Use the require library in all the tests. (https://github.com/dgraph-io/ristretto/pull/128) 7c48141 Use require in all tests in cache_test.go (https://github.com/dgraph-io/ristretto/pull/127) 51e97ad Sets with TTL (https://github.com/dgraph-io/ristretto/pull/122) d3e7c37 Add benchmarks for math.rand and fastrand (https://github.com/dgraph-io/ristretto/pull/118) 593823e Integrate fixes from PR https://github.com/dgraph-io/ristretto/pull/91. (https://github.com/dgraph-io/ristretto/pull/126) 29b4dd7 Fix comments. (https://github.com/dgraph-io/ristretto/pull/123) ddf345c Removed workflows directory. (https://github.com/dgraph-io/ristretto/pull/124) eb104d0 Add martinmr to CODEOWNERS file. (https://github.com/dgraph-io/ristretto/pull/125) ``` (cherry picked from commit 09dfa663bbc3d84655d3b3a617416273a8e0b385) --- go.mod | 2 +- go.sum | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/go.mod b/go.mod index eae04e485..6cb85b77c 100644 --- a/go.mod +++ b/go.mod @@ -5,7 +5,7 @@ go 1.12 require ( github.com/DataDog/zstd v1.4.1 github.com/cespare/xxhash v1.1.0 - github.com/dgraph-io/ristretto v0.0.2-0.20200115201040-8f368f2f2ab3 + github.com/dgraph-io/ristretto v0.0.3-0.20200630154024-f66de99634de github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2 github.com/dustin/go-humanize v1.0.0 github.com/golang/protobuf v1.3.1 diff --git a/go.sum b/go.sum index 4c71dbdf4..a4aa207f9 100644 --- a/go.sum +++ b/go.sum @@ -13,8 +13,8 @@ github.com/cpuguy83/go-md2man v1.0.10/go.mod h1:SmD6nW6nTyfqj6ABTjUi3V3JVMnlJmwc github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/dgraph-io/ristretto v0.0.2-0.20200115201040-8f368f2f2ab3 h1:MQLRM35Pp0yAyBYksjbj1nZI/w6eyRY/mWoM1sFf4kU= -github.com/dgraph-io/ristretto v0.0.2-0.20200115201040-8f368f2f2ab3/go.mod h1:KPxhHT9ZxKefz+PCeOGsrHpl1qZ7i70dGTu2u+Ahh6E= +github.com/dgraph-io/ristretto v0.0.3-0.20200630154024-f66de99634de h1:t0UHb5vdojIDUqktM6+xJAfScFBsVpXZmqC9dsgJmeA= +github.com/dgraph-io/ristretto v0.0.3-0.20200630154024-f66de99634de/go.mod h1:KPxhHT9ZxKefz+PCeOGsrHpl1qZ7i70dGTu2u+Ahh6E= github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2 h1:tdlZCpZ/P9DhczCTSixgIKmwPv6+wP5DGjqLYw5SUiA= github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2/go.mod h1:SqUrOPUnsFjfmXRMNPybcSiG0BgUW2AuFH8PAnS2iTw= github.com/dustin/go-humanize v1.0.0 h1:VSnTsYCnlFHaM2/igO1h6X3HA71jcobQuxemgkq4zYo=