diff --git a/db.go b/db.go index 0b6754077..f8fcc2708 100644 --- a/db.go +++ b/db.go @@ -856,7 +856,7 @@ func writeLevel0Table(ft flushTask, f io.Writer) error { b := table.NewTableBuilder() defer b.Close() for iter.SeekToFirst(); iter.Valid(); iter.Next() { - if len(ft.dropPrefix) > 0 && bytes.HasPrefix(iter.Key(), ft.dropPrefix) { + if len(ft.dropPrefixes) > 0 && hasAnyPrefixes(iter.Key(), ft.dropPrefixes) { continue } b.Add(iter.Key(), iter.Value()) @@ -866,9 +866,9 @@ func writeLevel0Table(ft flushTask, f io.Writer) error { } type flushTask struct { - mt *skl.Skiplist - vptr valuePointer - dropPrefix []byte + mt *skl.Skiplist + vptr valuePointer + dropPrefixes [][]byte } // handleFlushTask must be run serially. @@ -1452,7 +1452,8 @@ func (db *DB) dropAll() (func(), error) { // - Compact L0->L1, skipping over Kp. // - Compact rest of the levels, Li->Li, picking tables which have Kp. // - Resume memtable flushes, compactions and writes. -func (db *DB) DropPrefix(prefix []byte) error { +func (db *DB) DropPrefix(prefixes ...[]byte) error { + db.opt.Infof("DropPrefix Called") f := db.prepareToDrop() defer f() // Block all foreign interactions with memory tables. @@ -1468,8 +1469,8 @@ func (db *DB) DropPrefix(prefix []byte) error { task := flushTask{ mt: memtable, // Ensure that the head of value log gets persisted to disk. - vptr: db.vhead, - dropPrefix: prefix, + vptr: db.vhead, + dropPrefixes: prefixes, } db.opt.Debugf("Flushing memtable") if err := db.handleFlushTask(task); err != nil { @@ -1484,7 +1485,7 @@ func (db *DB) DropPrefix(prefix []byte) error { db.mt = skl.NewSkiplist(arenaSize(db.opt)) // Drop prefixes from the levels. - if err := db.lc.dropPrefix(prefix); err != nil { + if err := db.lc.dropPrefixes(prefixes); err != nil { return err } db.opt.Infof("DropPrefix done") diff --git a/levels.go b/levels.go index b823dc346..61dcb60dd 100644 --- a/levels.go +++ b/levels.go @@ -262,9 +262,25 @@ func (s *levelsController) dropTree() (int, error) { // tables who only have keys with this prefix are quickly dropped. The ones which have other keys // are run through MergeIterator and compacted to create new tables. All the mechanisms of // compactions apply, i.e. level sizes and MANIFEST are updated as in the normal flow. -func (s *levelsController) dropPrefix(prefix []byte) error { +func (s *levelsController) dropPrefixes(prefixes [][]byte) error { + // Internal move keys related to the given prefix should also be skipped. + for _, prefix := range prefixes { + key := make([]byte, 0, len(badgerMove)+len(prefix)) + key = append(key, badgerMove...) + key = append(key, prefix...) + prefixes = append(prefixes, key) + } + opt := s.kv.opt - for _, l := range s.levels { + // Iterate levels in the reverse order because if we were to iterate from + // lower level (say level 0) to a higher level (say level 3) we could have + // a state in which level 0 is compacted and an older version of a key exists in lower level. + // At this point, if someone creates an iterator, they would see an old + // value for a key from lower levels. Iterating in reverse order ensures we + // drop the oldest data first so that lookups never return stale data. + for i := len(s.levels) - 1; i >= 0; i-- { + l := s.levels[i] + l.RLock() if l.level == 0 { size := len(l.tables) @@ -276,7 +292,7 @@ func (s *levelsController) dropPrefix(prefix []byte) error { score: 1.74, // A unique number greater than 1.0 does two things. Helps identify this // function in logs, and forces a compaction. - dropPrefix: prefix, + dropPrefixes: prefixes, } if err := s.doCompact(cp); err != nil { opt.Warningf("While compacting level 0: %v", err) @@ -286,39 +302,49 @@ func (s *levelsController) dropPrefix(prefix []byte) error { continue } - var tables []*table.Table - // Internal move keys related to the given prefix should also be skipped. - moveKeyForPrefix := append(badgerMove, prefix...) - prefixesToSkip := [][]byte{prefix, moveKeyForPrefix} - for _, table := range l.tables { - var absent bool - switch { - case hasAnyPrefixes(table.Smallest(), prefixesToSkip): - case hasAnyPrefixes(table.Biggest(), prefixesToSkip): - case containsAnyPrefixes(table.Smallest(), table.Biggest(), prefixesToSkip): - default: - absent = true + // Build a list of compaction tableGroups affecting all the prefixes we + // need to drop. We need to build tableGroups that satisfy the invariant that + // bottom tables are consecutive. + // tableGroup contains groups of consecutive tables. + var tableGroups [][]*table.Table + var tableGroup []*table.Table + + finishGroup := func() { + if len(tableGroup) > 0 { + tableGroups = append(tableGroups, tableGroup) + tableGroup = nil } - if !absent { - tables = append(tables, table) + } + + for _, table := range l.tables { + if containsAnyPrefixes(table.Smallest(), table.Biggest(), prefixes) { + tableGroup = append(tableGroup, table) + } else { + finishGroup() } } + finishGroup() + l.RUnlock() - if len(tables) == 0 { + + if len(tableGroups) == 0 { continue } - cd := compactDef{ - elog: trace.New(fmt.Sprintf("Badger.L%d", l.level), "Compact"), - thisLevel: l, - nextLevel: l, - top: []*table.Table{}, - bot: tables, - dropPrefix: prefix, - } - if err := s.runCompactDef(l.level, cd); err != nil { - opt.Warningf("While running compact def: %+v. Error: %v", cd, err) - return err + opt.Infof("Dropping prefix at level %d (%d tableGroups)", l.level, len(tableGroups)) + for _, operation := range tableGroups { + cd := compactDef{ + elog: trace.New(fmt.Sprintf("Badger.L%d", l.level), "Compact"), + thisLevel: l, + nextLevel: l, + top: nil, + bot: operation, + dropPrefixes: prefixes, + } + if err := s.runCompactDef(l.level, cd); err != nil { + opt.Warningf("While running compact def: %+v. Error: %v", cd, err) + return err + } } } return nil @@ -380,9 +406,9 @@ func (l *levelHandler) isCompactable(delSize int64) bool { } type compactionPriority struct { - level int - score float64 - dropPrefix []byte + level int + score float64 + dropPrefixes [][]byte } // pickCompactLevel determines which level to compact. @@ -470,13 +496,19 @@ func (s *levelsController) compactBuildTables( // Next level has level>=1 and we can use ConcatIterator as key ranges do not overlap. var valid []*table.Table + +nextTable: for _, table := range botTables { - if len(cd.dropPrefix) > 0 && - bytes.HasPrefix(table.Smallest(), cd.dropPrefix) && - bytes.HasPrefix(table.Biggest(), cd.dropPrefix) { - // All the keys in this table have the dropPrefix. So, this table does not need to be - // in the iterator and can be dropped immediately. - continue + if len(cd.dropPrefixes) > 0 { + for _, prefix := range cd.dropPrefixes { + if bytes.HasPrefix(table.Smallest(), prefix) && + bytes.HasPrefix(table.Biggest(), prefix) { + // All the keys in this table have the dropPrefix. So, this + // table does not need to be in the iterator and can be + // dropped immediately. + continue nextTable + } + } } valid = append(valid, table) } @@ -503,12 +535,9 @@ func (s *levelsController) compactBuildTables( timeStart := time.Now() builder := table.NewTableBuilder() var numKeys, numSkips uint64 - // Internal move keys related to the given prefix should also be skipped. - moveKeyForPrefix := append(badgerMove, cd.dropPrefix...) - prefixesToSkip := [][]byte{cd.dropPrefix, moveKeyForPrefix} for ; it.Valid(); it.Next() { // See if we need to skip the prefix. - if len(cd.dropPrefix) > 0 && hasAnyPrefixes(it.Key(), prefixesToSkip) { + if len(cd.dropPrefixes) > 0 && hasAnyPrefixes(it.Key(), cd.dropPrefixes) { numSkips++ updateStats(it.Value()) continue @@ -672,10 +701,24 @@ func hasAnyPrefixes(s []byte, listOfPrefixes [][]byte) bool { return false } +func containsPrefix(smallValue, largeValue, prefix []byte) bool { + if bytes.HasPrefix(smallValue, prefix) { + return true + } + if bytes.HasPrefix(largeValue, prefix) { + return true + } + if bytes.Compare(prefix, smallValue) > 0 && + bytes.Compare(prefix, largeValue) < 0 { + return true + } + + return false +} + func containsAnyPrefixes(smallValue, largeValue []byte, listOfPrefixes [][]byte) bool { for _, prefix := range listOfPrefixes { - if bytes.Compare(prefix, smallValue) > 0 && - bytes.Compare(prefix, largeValue) < 0 { + if containsPrefix(smallValue, largeValue, prefix) { return true } } @@ -697,7 +740,7 @@ type compactDef struct { thisSize int64 - dropPrefix []byte + dropPrefixes [][]byte } func (cd *compactDef) lockLevels() { @@ -859,10 +902,10 @@ func (s *levelsController) doCompact(p compactionPriority) error { y.AssertTrue(l+1 < s.kv.opt.MaxLevels) // Sanity check. cd := compactDef{ - elog: trace.New(fmt.Sprintf("Badger.L%d", l), "Compact"), - thisLevel: s.levels[l], - nextLevel: s.levels[l+1], - dropPrefix: p.dropPrefix, + elog: trace.New(fmt.Sprintf("Badger.L%d", l), "Compact"), + thisLevel: s.levels[l], + nextLevel: s.levels[l+1], + dropPrefixes: p.dropPrefixes, } cd.elog.SetMaxEvents(100) defer cd.elog.Finish() diff --git a/levels_test.go b/levels_test.go index 3891dc36d..fd2bbb55f 100644 --- a/levels_test.go +++ b/levels_test.go @@ -638,3 +638,43 @@ func TestDiscardFirstVersion(t *testing.T) { getAllAndCheck(t, db, ExpectedKeys) }) } + +// Regression test for https://github.com/dgraph-io/dgraph/issues/5573 +func TestDropPrefixMoveBug(t *testing.T) { + runBadgerTest(t, nil, func(t *testing.T, db *DB) { + // l1 is used to verify that drop prefix actually drops move keys from all the levels. + l1 := []keyValVersion{{string(append(badgerMove, "F"...)), "", 0, 0}} + createAndOpen(db, l1, 1) + + // Mutiple levels can have the exact same move key with version. + l2 := []keyValVersion{{string(append(badgerMove, "F"...)), "", 0, 0}, {"A", "", 0, 0}} + l21 := []keyValVersion{{"B", "", 0, 0}, {"C", "", 0, 0}} + l22 := []keyValVersion{{"F", "", 0, 0}, {"G", "", 0, 0}} + + // Level 2 has all the tables. + createAndOpen(db, l2, 2) + createAndOpen(db, l21, 2) + createAndOpen(db, l22, 2) + + require.NoError(t, db.lc.validate()) + require.NoError(t, db.DropPrefix([]byte("F"))) + + db.View(func(txn *Txn) error { + iopt := DefaultIteratorOptions + iopt.AllVersions = true + + it := txn.NewIterator(iopt) + defer it.Close() + + specialKey := []byte("F") + droppedPrefixes := [][]byte{specialKey, append(badgerMove, specialKey...)} + for it.Rewind(); it.Valid(); it.Next() { + key := it.Item().Key() + // Ensure we don't have any "F" or "!badger!move!F" left + require.False(t, hasAnyPrefixes(key, droppedPrefixes)) + } + return nil + }) + require.NoError(t, db.lc.validate()) + }) +} diff --git a/util.go b/util.go index c5173e26c..2726b7ad5 100644 --- a/util.go +++ b/util.go @@ -60,8 +60,8 @@ func (s *levelHandler) validate() error { if y.CompareKeys(s.tables[j].Smallest(), s.tables[j].Biggest()) > 0 { return errors.Errorf( - "Intra: %q vs %q: level=%d j=%d numTables=%d", - s.tables[j].Smallest(), s.tables[j].Biggest(), s.level, j, numTables) + "Intra: \n%s\n vs \n%s\n: level=%d j=%d numTables=%d", + hex.Dump(s.tables[j].Smallest()), hex.Dump(s.tables[j].Biggest()), s.level, j, numTables) } } return nil