From 8075bd653f00e8ee21af4825cd75c3c98ac93af0 Mon Sep 17 00:00:00 2001 From: Damien Tournoud Date: Mon, 22 Jun 2020 08:46:14 -0700 Subject: [PATCH 1/4] Rework DB.DropPrefix Fixes three issues with the current implementation: - It can generate compaction requests that break the invariant that bottom tables need to be consecutive (issue #1380) - It performs the same level compactions in increasing order of levels (starting from L0) which leads to old versions of keys for the prefix re-surfacing to active transactions - When you have to drop multiple prefixes, the API forces you to drop one prefix at a time and go through the whole expensive table rewriting multiple times. --- db.go | 16 +++---- levels.go | 133 ++++++++++++++++++++++++++++++++++-------------------- 2 files changed, 92 insertions(+), 57 deletions(-) diff --git a/db.go b/db.go index 2234843f2..0de551cd9 100644 --- a/db.go +++ b/db.go @@ -974,7 +974,7 @@ func buildL0Table(ft flushTask, bopts table.Options) []byte { defer b.Close() var vp valuePointer for iter.SeekToFirst(); iter.Valid(); iter.Next() { - if len(ft.dropPrefix) > 0 && bytes.HasPrefix(iter.Key(), ft.dropPrefix) { + if len(ft.dropPrefixes) > 0 && hasAnyPrefixes(iter.Key(), ft.dropPrefixes) { continue } vs := iter.Value() @@ -987,9 +987,9 @@ func buildL0Table(ft flushTask, bopts table.Options) []byte { } type flushTask struct { - mt *skl.Skiplist - vptr valuePointer - dropPrefix []byte + mt *skl.Skiplist + vptr valuePointer + dropPrefixes [][]byte } // handleFlushTask must be run serially. @@ -1618,7 +1618,7 @@ func (db *DB) dropAll() (func(), error) { // - Compact L0->L1, skipping over Kp. // - Compact rest of the levels, Li->Li, picking tables which have Kp. // - Resume memtable flushes, compactions and writes. -func (db *DB) DropPrefix(prefix []byte) error { +func (db *DB) DropPrefix(prefixes ...[]byte) error { db.opt.Infof("DropPrefix Called") f, err := db.prepareToDrop() if err != nil { @@ -1638,8 +1638,8 @@ func (db *DB) DropPrefix(prefix []byte) error { task := flushTask{ mt: memtable, // Ensure that the head of value log gets persisted to disk. - vptr: db.vhead, - dropPrefix: prefix, + vptr: db.vhead, + dropPrefixes: prefixes, } db.opt.Debugf("Flushing memtable") if err := db.handleFlushTask(task); err != nil { @@ -1654,7 +1654,7 @@ func (db *DB) DropPrefix(prefix []byte) error { db.mt = skl.NewSkiplist(arenaSize(db.opt)) // Drop prefixes from the levels. - if err := db.lc.dropPrefix(prefix); err != nil { + if err := db.lc.dropPrefixes(prefixes); err != nil { return err } db.opt.Infof("DropPrefix done") diff --git a/levels.go b/levels.go index 67908faad..ea6fc1174 100644 --- a/levels.go +++ b/levels.go @@ -274,9 +274,19 @@ func (s *levelsController) dropTree() (int, error) { // tables who only have keys with this prefix are quickly dropped. The ones which have other keys // are run through MergeIterator and compacted to create new tables. All the mechanisms of // compactions apply, i.e. level sizes and MANIFEST are updated as in the normal flow. -func (s *levelsController) dropPrefix(prefix []byte) error { +func (s *levelsController) dropPrefixes(prefixes [][]byte) error { + // Internal move keys related to the given prefix should also be skipped. + for _, prefix := range prefixes { + key := make([]byte, 0, len(badgerMove)+len(prefix)) + key = append(key, badgerMove...) + key = append(key, prefix...) + prefixes = append(prefixes, key) + } + opt := s.kv.opt - for _, l := range s.levels { + for i := len(s.levels) - 1; i >= 0; i-- { + l := s.levels[i] + l.RLock() if l.level == 0 { size := len(l.tables) @@ -288,7 +298,7 @@ func (s *levelsController) dropPrefix(prefix []byte) error { score: 1.74, // A unique number greater than 1.0 does two things. Helps identify this // function in logs, and forces a compaction. - dropPrefix: prefix, + dropPrefixes: prefixes, } if err := s.doCompact(cp); err != nil { opt.Warningf("While compacting level 0: %v", err) @@ -298,39 +308,48 @@ func (s *levelsController) dropPrefix(prefix []byte) error { continue } - var tables []*table.Table - // Internal move keys related to the given prefix should also be skipped. - moveKeyForPrefix := append(badgerMove, prefix...) - prefixesToSkip := [][]byte{prefix, moveKeyForPrefix} - for _, table := range l.tables { - var absent bool - switch { - case hasAnyPrefixes(table.Smallest(), prefixesToSkip): - case hasAnyPrefixes(table.Biggest(), prefixesToSkip): - case containsAnyPrefixes(table.Smallest(), table.Biggest(), prefixesToSkip): - default: - absent = true + // Build a list of compaction operations affecting all the prefixes we + // need to drop. We need to build operations that satisfy the invariant that + // bottom tables are consecutive. + var operations [][]*table.Table + var currentOperation []*table.Table + + nextOperation := func() { + if len(currentOperation) > 0 { + operations = append(operations, currentOperation) + currentOperation = nil } - if !absent { - tables = append(tables, table) + } + + for _, table := range l.tables { + if containsAnyPrefixes(table.Smallest(), table.Biggest(), prefixes) { + currentOperation = append(currentOperation, table) + } else { + nextOperation() } } + nextOperation() + l.RUnlock() - if len(tables) == 0 { + + if len(operations) == 0 { continue } - cd := compactDef{ - elog: trace.New(fmt.Sprintf("Badger.L%d", l.level), "Compact"), - thisLevel: l, - nextLevel: l, - top: []*table.Table{}, - bot: tables, - dropPrefix: prefix, - } - if err := s.runCompactDef(l.level, cd); err != nil { - opt.Warningf("While running compact def: %+v. Error: %v", cd, err) - return err + opt.Infof("Dropping prefix at level %d (%d operations)", l.level, len(operations)) + for _, operation := range operations { + cd := compactDef{ + elog: trace.New(fmt.Sprintf("Badger.L%d", l.level), "Compact"), + thisLevel: l, + nextLevel: l, + top: nil, + bot: operation, + dropPrefixes: prefixes, + } + if err := s.runCompactDef(l.level, cd); err != nil { + opt.Warningf("While running compact def: %+v. Error: %v", cd, err) + return err + } } } return nil @@ -395,9 +414,9 @@ func (l *levelHandler) isCompactable(delSize int64) bool { } type compactionPriority struct { - level int - score float64 - dropPrefix []byte + level int + score float64 + dropPrefixes [][]byte } // pickCompactLevel determines which level to compact. @@ -491,13 +510,18 @@ func (s *levelsController) compactBuildTables( // Next level has level>=1 and we can use ConcatIterator as key ranges do not overlap. var valid []*table.Table + +nextTable: for _, table := range botTables { - if len(cd.dropPrefix) > 0 && - bytes.HasPrefix(table.Smallest(), cd.dropPrefix) && - bytes.HasPrefix(table.Biggest(), cd.dropPrefix) { - // All the keys in this table have the dropPrefix. So, this table does not need to be - // in the iterator and can be dropped immediately. - continue + if len(cd.dropPrefixes) > 0 { + for _, prefix := range cd.dropPrefixes { + if bytes.HasPrefix(table.Smallest(), prefix) && + bytes.HasPrefix(table.Biggest(), prefix) { + // All the keys in this table have the dropPrefix. So, this table does not need to be + // in the iterator and can be dropped immediately. + continue nextTable + } + } } valid = append(valid, table) } @@ -535,12 +559,9 @@ func (s *levelsController) compactBuildTables( bopts.BfCache = s.kv.bfCache builder := table.NewTableBuilder(bopts) var numKeys, numSkips uint64 - // Internal move keys related to the given prefix should also be skipped. - moveKeyForPrefix := append(badgerMove, cd.dropPrefix...) - prefixesToSkip := [][]byte{cd.dropPrefix, moveKeyForPrefix} for ; it.Valid(); it.Next() { // See if we need to skip the prefix. - if len(cd.dropPrefix) > 0 && hasAnyPrefixes(it.Key(), prefixesToSkip) { + if len(cd.dropPrefixes) > 0 && hasAnyPrefixes(it.Key(), cd.dropPrefixes) { numSkips++ updateStats(it.Value()) continue @@ -719,10 +740,24 @@ func hasAnyPrefixes(s []byte, listOfPrefixes [][]byte) bool { return false } +func containsPrefix(smallValue, largeValue, prefix []byte) bool { + if bytes.HasPrefix(smallValue, prefix) { + return true + } + if bytes.HasPrefix(largeValue, prefix) { + return true + } + if bytes.Compare(prefix, smallValue) > 0 && + bytes.Compare(prefix, largeValue) < 0 { + return true + } + + return false +} + func containsAnyPrefixes(smallValue, largeValue []byte, listOfPrefixes [][]byte) bool { for _, prefix := range listOfPrefixes { - if bytes.Compare(prefix, smallValue) > 0 && - bytes.Compare(prefix, largeValue) < 0 { + if containsPrefix(smallValue, largeValue, prefix) { return true } } @@ -744,7 +779,7 @@ type compactDef struct { thisSize int64 - dropPrefix []byte + dropPrefixes [][]byte } func (cd *compactDef) lockLevels() { @@ -918,10 +953,10 @@ func (s *levelsController) doCompact(p compactionPriority) error { y.AssertTrue(l+1 < s.kv.opt.MaxLevels) // Sanity check. cd := compactDef{ - elog: trace.New(fmt.Sprintf("Badger.L%d", l), "Compact"), - thisLevel: s.levels[l], - nextLevel: s.levels[l+1], - dropPrefix: p.dropPrefix, + elog: trace.New(fmt.Sprintf("Badger.L%d", l), "Compact"), + thisLevel: s.levels[l], + nextLevel: s.levels[l+1], + dropPrefixes: p.dropPrefixes, } cd.elog.SetMaxEvents(100) defer cd.elog.Finish() From f27bab4c0677bd93eaf0eea65b7b3c2f1061ab11 Mon Sep 17 00:00:00 2001 From: Ibrahim Jarif Date: Thu, 25 Jun 2020 16:05:16 +0530 Subject: [PATCH 2/4] Add test and fix line length --- levels.go | 5 +++-- levels_test.go | 35 +++++++++++++++++++++++++++++++++++ util.go | 4 ++-- 3 files changed, 40 insertions(+), 4 deletions(-) diff --git a/levels.go b/levels.go index ea6fc1174..c217191b5 100644 --- a/levels.go +++ b/levels.go @@ -517,8 +517,9 @@ nextTable: for _, prefix := range cd.dropPrefixes { if bytes.HasPrefix(table.Smallest(), prefix) && bytes.HasPrefix(table.Biggest(), prefix) { - // All the keys in this table have the dropPrefix. So, this table does not need to be - // in the iterator and can be dropped immediately. + // All the keys in this table have the dropPrefix. So, this + // table does not need to be in the iterator and can be + // dropped immediately. continue nextTable } } diff --git a/levels_test.go b/levels_test.go index b6980298b..aa9049421 100644 --- a/levels_test.go +++ b/levels_test.go @@ -17,6 +17,7 @@ package badger import ( + "bytes" "math" "testing" "time" @@ -778,3 +779,37 @@ func TestL0Stall(t *testing.T) { test(t, &opt) }) } + +// Regression test for https://github.com/dgraph-io/dgraph/issues/5573 +func TestDropPrefixMoveBug(t *testing.T) { + runBadgerTest(t, nil, func(t *testing.T, db *DB) { + l1 := []keyValVersion{{string(append(badgerMove, "F"...)), "", 3, 0}, {"A", "", 1, 0}} + createAndOpen(db, l1, 1) + + l2 := []keyValVersion{{string(append(badgerMove, "F"...)), "", 3, 0}, {"A", "", 1, 0}} + l21 := []keyValVersion{{"B", "", 2, 0}, {"C", "", 2, 0}} + l22 := []keyValVersion{{"F", "", 3, 0}, {"G", "", 3, 0}} + + // Level 2 has all the tables. + createAndOpen(db, l2, 2) + createAndOpen(db, l21, 2) + createAndOpen(db, l22, 2) + + require.NoError(t, db.lc.validate()) + require.NoError(t, db.DropPrefix([]byte("F"))) + + db.View(func(txn *Txn) error { + iopt := DefaultIteratorOptions + iopt.AllVersions = true + + it := txn.NewIterator(iopt) + defer it.Close() + for it.Rewind(); it.Valid(); it.Next() { + key := it.Item().Key() + require.False(t, bytes.HasPrefix(key, []byte("F"))) + } + return nil + }) + require.NoError(t, db.lc.validate()) + }) +} diff --git a/util.go b/util.go index b7f173dd3..ccf7939f3 100644 --- a/util.go +++ b/util.go @@ -60,8 +60,8 @@ func (s *levelHandler) validate() error { if y.CompareKeys(s.tables[j].Smallest(), s.tables[j].Biggest()) > 0 { return errors.Errorf( - "Intra: %q vs %q: level=%d j=%d numTables=%d", - s.tables[j].Smallest(), s.tables[j].Biggest(), s.level, j, numTables) + "Intra: \n%s\n vs \n%s\n: level=%d j=%d numTables=%d", + hex.Dump(s.tables[j].Smallest()), hex.Dump(s.tables[j].Biggest()), s.level, j, numTables) } } return nil From 924613c62958fd1573db6cd40440a9b031e36fdd Mon Sep 17 00:00:00 2001 From: Ibrahim Jarif Date: Thu, 25 Jun 2020 19:34:54 +0530 Subject: [PATCH 3/4] Rename operations to tableGroups --- levels.go | 37 ++++++++++++++++++++++--------------- 1 file changed, 22 insertions(+), 15 deletions(-) diff --git a/levels.go b/levels.go index c217191b5..71f0a3662 100644 --- a/levels.go +++ b/levels.go @@ -284,6 +284,12 @@ func (s *levelsController) dropPrefixes(prefixes [][]byte) error { } opt := s.kv.opt + // Iterate levels in the reverse order because if we were to iterate from + // lower level (say level 0) to a higher level (say level 3) we could have + // a state in which level 0 is compacted and an older version of a key exists in lower level. + // At this point, if someone creates an iterator, they would see an old + // value for a key from lower levels. Iterating in reverse order ensures we + // drop the oldest data first so that lookups never return stale data. for i := len(s.levels) - 1; i >= 0; i-- { l := s.levels[i] @@ -308,36 +314,37 @@ func (s *levelsController) dropPrefixes(prefixes [][]byte) error { continue } - // Build a list of compaction operations affecting all the prefixes we - // need to drop. We need to build operations that satisfy the invariant that + // Build a list of compaction tableGroups affecting all the prefixes we + // need to drop. We need to build tableGroups that satisfy the invariant that // bottom tables are consecutive. - var operations [][]*table.Table - var currentOperation []*table.Table - - nextOperation := func() { - if len(currentOperation) > 0 { - operations = append(operations, currentOperation) - currentOperation = nil + // tableGroup contains groups of consecutive tables. + var tableGroups [][]*table.Table + var tableGroup []*table.Table + + finishGroup := func() { + if len(tableGroup) > 0 { + tableGroups = append(tableGroups, tableGroup) + tableGroup = nil } } for _, table := range l.tables { if containsAnyPrefixes(table.Smallest(), table.Biggest(), prefixes) { - currentOperation = append(currentOperation, table) + tableGroup = append(tableGroup, table) } else { - nextOperation() + finishGroup() } } - nextOperation() + finishGroup() l.RUnlock() - if len(operations) == 0 { + if len(tableGroups) == 0 { continue } - opt.Infof("Dropping prefix at level %d (%d operations)", l.level, len(operations)) - for _, operation := range operations { + opt.Infof("Dropping prefix at level %d (%d tableGroups)", l.level, len(tableGroups)) + for _, operation := range tableGroups { cd := compactDef{ elog: trace.New(fmt.Sprintf("Badger.L%d", l.level), "Compact"), thisLevel: l, From 0cbad897ca079b82f8db50b50f73e785c677f7e3 Mon Sep 17 00:00:00 2001 From: Ibrahim Jarif Date: Thu, 25 Jun 2020 19:53:33 +0530 Subject: [PATCH 4/4] fixup --- levels_test.go | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/levels_test.go b/levels_test.go index aa9049421..6a056e8cf 100644 --- a/levels_test.go +++ b/levels_test.go @@ -17,7 +17,6 @@ package badger import ( - "bytes" "math" "testing" "time" @@ -783,12 +782,14 @@ func TestL0Stall(t *testing.T) { // Regression test for https://github.com/dgraph-io/dgraph/issues/5573 func TestDropPrefixMoveBug(t *testing.T) { runBadgerTest(t, nil, func(t *testing.T, db *DB) { - l1 := []keyValVersion{{string(append(badgerMove, "F"...)), "", 3, 0}, {"A", "", 1, 0}} + // l1 is used to verify that drop prefix actually drops move keys from all the levels. + l1 := []keyValVersion{{string(append(badgerMove, "F"...)), "", 0, 0}} createAndOpen(db, l1, 1) - l2 := []keyValVersion{{string(append(badgerMove, "F"...)), "", 3, 0}, {"A", "", 1, 0}} - l21 := []keyValVersion{{"B", "", 2, 0}, {"C", "", 2, 0}} - l22 := []keyValVersion{{"F", "", 3, 0}, {"G", "", 3, 0}} + // Mutiple levels can have the exact same move key with version. + l2 := []keyValVersion{{string(append(badgerMove, "F"...)), "", 0, 0}, {"A", "", 0, 0}} + l21 := []keyValVersion{{"B", "", 0, 0}, {"C", "", 0, 0}} + l22 := []keyValVersion{{"F", "", 0, 0}, {"G", "", 0, 0}} // Level 2 has all the tables. createAndOpen(db, l2, 2) @@ -804,9 +805,13 @@ func TestDropPrefixMoveBug(t *testing.T) { it := txn.NewIterator(iopt) defer it.Close() + + specialKey := []byte("F") + droppedPrefixes := [][]byte{specialKey, append(badgerMove, specialKey...)} for it.Rewind(); it.Valid(); it.Next() { key := it.Item().Key() - require.False(t, bytes.HasPrefix(key, []byte("F"))) + // Ensure we don't have any "F" or "!badger!move!F" left + require.False(t, hasAnyPrefixes(key, droppedPrefixes)) } return nil })