Skip to content

Commit

Permalink
change the way purge works (#392)
Browse files Browse the repository at this point in the history
* change the way purge works
* Stop searching as soon as value is found instead of searching all tables
* Define ErrPurged to be nil for windows/osx
* update gc stats in background
*Ensure all versions of key are always written to same table.
* add test to punch holes twice in same file and delete it later
  • Loading branch information
Janardhan Reddy authored Jan 10, 2018
1 parent 1138454 commit a44a56e
Show file tree
Hide file tree
Showing 8 changed files with 350 additions and 121 deletions.
199 changes: 126 additions & 73 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,10 @@ import (
)

var (
badgerPrefix = []byte("!badger!") // Prefix for internal keys used by badger.
head = []byte("!badger!head") // For storing value offset for replay.
txnKey = []byte("!badger!txn") // For indicating end of entries in txn.
badgerPrefix = []byte("!badger!") // Prefix for internal keys used by badger.
head = []byte("!badger!head") // For storing value offset for replay.
txnKey = []byte("!badger!txn") // For indicating end of entries in txn.
purgePrefix = []byte("!badger!purge") // Stores the version below which we need to purge.
)

type closers struct {
Expand All @@ -50,6 +51,7 @@ type closers struct {
memtable *y.Closer
writes *y.Closer
valueGC *y.Closer
gcStats *y.Closer
}

// DB provides the various functions required to interact with Badger.
Expand All @@ -61,17 +63,18 @@ type DB struct {
// nil if Dir and ValueDir are the same
valueDirGuard *directoryLockGuard

closers closers
elog trace.EventLog
mt *skl.Skiplist // Our latest (actively written) in-memory table
imm []*skl.Skiplist // Add here only AFTER pushing to flushChan.
opt Options
manifest *manifestFile
lc *levelsController
vlog valueLog
vptr valuePointer // less than or equal to a pointer to the last vlog value put into mt
writeCh chan *request
flushChan chan flushTask // For flushing memtables.
closers closers
elog trace.EventLog
mt *skl.Skiplist // Our latest (actively written) in-memory table
imm []*skl.Skiplist // Add here only AFTER pushing to flushChan.
opt Options
manifest *manifestFile
lc *levelsController
vlog valueLog
vptr valuePointer // less than or equal to a pointer to the last vlog value put into mt
writeCh chan *request
flushChan chan flushTask // For flushing memtables.
purgeUpdateCh chan purgeUpdate // For updating GcStats

orc *oracle
}
Expand Down Expand Up @@ -237,6 +240,7 @@ func Open(opt Options) (db *DB, err error) {
imm: make([]*skl.Skiplist, 0, opt.NumMemtables),
flushChan: make(chan flushTask, opt.NumMemtables),
writeCh: make(chan *request, kvWriteChCapacity),
purgeUpdateCh: make(chan purgeUpdate, 1000),
opt: opt,
manifest: manifestFile,
elog: trace.NewEventLog("Badger", "DB"),
Expand Down Expand Up @@ -309,6 +313,9 @@ func Open(opt Options) (db *DB, err error) {
db.closers.valueGC = y.NewCloser(1)
go db.vlog.waitOnGC(db.closers.valueGC)

db.closers.gcStats = y.NewCloser(1)
go db.runUpdateGCStats(db.closers.gcStats)

valueDirLockGuard = nil
dirLockGuard = nil
manifestFile = nil
Expand All @@ -322,6 +329,9 @@ func (db *DB) Close() (err error) {
// Stop value GC first.
db.closers.valueGC.SignalAndWait()

// Stop GC stats update.
db.closers.gcStats.SignalAndWait()

// Stop writes next.
db.closers.writes.SignalAndWait()

Expand Down Expand Up @@ -448,33 +458,25 @@ func (db *DB) getMemTables() ([]*skl.Skiplist, func()) {

// get returns the value in memtable or disk for given key.
// Note that value will include meta byte.
// IMPORTANT: We should never write an entry with a older timestamp for same key,
// We need to maintain this invariant to search for latest value of a key,
// or else we need to search in all tables and find the max version among them.
// To maintain this invariant, we also need to ensure that all versions of a key
// are always present in same table from level 1, because compaction can push
// any table down.
func (db *DB) get(key []byte) (y.ValueStruct, error) {
tables, decr := db.getMemTables() // Lock should be released.
defer decr()

y.NumGets.Add(1)
version := y.ParseTs(key)
var maxVs y.ValueStruct
// Need to search for values in all tables, with managed db
// latest value needn't be present in the latest table.
// Even without managed db, purging can cause this constraint
// to be violated.
// Search until required version is found or iterate over all
// tables and return max version.
for i := 0; i < len(tables); i++ {
vs := tables[i].Get(key)
y.NumMemtableGets.Add(1)
if vs.Meta == 0 && vs.Value == nil {
continue
}
if vs.Version == version {
if vs.Meta != 0 || vs.Value != nil {
return vs, nil
}
if maxVs.Version < vs.Version {
maxVs = vs
}
}
return db.lc.get(key, maxVs)
return db.lc.get(key)
}

func (db *DB) updateOffset(ptrs []valuePointer) {
Expand Down Expand Up @@ -888,40 +890,85 @@ func (db *DB) updateSize(lc *y.Closer) {
}
}

// PurgeVersionsBelow will delete all versions of a key below the specified version
func (db *DB) PurgeVersionsBelow(key []byte, ts uint64) error {
txn := db.NewTransaction(false)
defer txn.Discard()
return db.purgeVersionsBelow(txn, key, ts)
func (db *DB) runUpdateGCStats(lc *y.Closer) {
defer lc.Done()
for {
select {
case t := <-db.purgeUpdateCh:
txn := db.NewTransaction(false)
db.updateGCStats(txn, t)
txn.Discard()
case <-lc.HasBeenClosed():
return
}
}
}

func (db *DB) purgeVersionsBelow(txn *Txn, key []byte, ts uint64) error {
func purgeKey(key []byte) []byte {
return y.KeyWithTs(append(purgePrefix, key...), 1)
}

func (db *DB) purgeTs(key []byte) uint64 {
vs, err := db.get(purgeKey(key))
if err != nil {
return 0
} else if isDeletedOrExpired(vs.Meta, vs.ExpiresAt) {
// If purgekey is deleted, then purgeTs would be zero
// But we never delete purgeKey.
return 0
} else if len(vs.Value) > 0 {
return binary.BigEndian.Uint64(vs.Value)
}
return 0
}

type purgeUpdate struct {
key []byte
from uint64
end uint64
}

func (db *DB) updateGCStats(txn *Txn, t purgeUpdate) {
opts := DefaultIteratorOptions
opts.AllVersions = true
opts.PrefetchValues = false
it := txn.NewIterator(opts)
defer it.Close()

var entries []*Entry

for it.Seek(key); it.ValidForPrefix(key); it.Next() {
for it.Seek(t.key); it.ValidForPrefix(t.key); it.Next() {
item := it.Item()
if !bytes.Equal(key, item.Key()) || item.Version() >= ts {
if !bytes.Equal(t.key, item.Key()) {
break
} else if item.Version() > t.end {
continue
}
if isDeletedOrExpired(item.meta, item.ExpiresAt()) {
} else if item.Version() < t.from {
break
} else if isDeletedOrExpired(item.meta, item.ExpiresAt()) {
continue
}

// Found an older version. Mark for deletion
entries = append(entries,
&Entry{
Key: y.KeyWithTs(key, item.version),
meta: bitDelete,
})
db.vlog.updateGCStats(item)
}
return db.batchSet(entries)
}

// PurgeVersionsBelow will delete all versions of a key below the specified version
func (db *DB) PurgeVersionsBelow(key []byte, ts uint64) error {
updateGcTask := purgeUpdate{
key: key,
from: db.purgeTs(key),
end: ts - 1,
}
select {
case db.purgeUpdateCh <- updateGcTask:
default:
}

buf := make([]byte, 10)
binary.BigEndian.PutUint64(buf, ts)
e := &Entry{
Key: purgeKey(key),
Value: buf,
}
return db.batchSet([]*Entry{e})
}

// PurgeOlderVersions deletes older versions of all keys.
Expand All @@ -933,6 +980,7 @@ func (db *DB) purgeVersionsBelow(txn *Txn, key []byte, ts uint64) error {
func (db *DB) PurgeOlderVersions() error {
return db.View(func(txn *Txn) error {
opts := DefaultIteratorOptions
// We need to use AllVersions otherwise we won't get deleted keys in merge iterator.
opts.AllVersions = true
opts.PrefetchValues = false
it := txn.NewIterator(opts)
Expand Down Expand Up @@ -963,37 +1011,43 @@ func (db *DB) PurgeOlderVersions() error {
}
}

// Since the older versions of value are not deleted in lsm, we need to reset gcstats
// or else same entry would be counted as discarded everytime we call PurgeOlderVersions.
db.vlog.resetGCStats()
for it.Rewind(); it.Valid(); it.Next() {
item := it.Item()
// This is latest version for this key.
if !bytes.Equal(lastKey, item.Key()) {
lastKey = y.SafeCopy(lastKey, item.Key())
buf := make([]byte, 10)
binary.BigEndian.PutUint64(buf, item.Version())
e := &Entry{
Key: purgeKey(lastKey),
Value: buf,
}

curSize := e.estimateSize(db.opt.ValueThreshold)
// Batch up min(1000, maxBatchCount) entries at a time and write
// Ensure that total batch size doesn't exceed maxBatchSize
if count == 1000 || count+1 >= int(db.opt.maxBatchCount) ||
size+curSize >= int(db.opt.maxBatchSize) {
if err := batchSetAsyncIfNoErr(entries); err != nil {
return err
}
count = 0
size = 0
entries = []*Entry{}
}
size += curSize
count++
entries = append(entries, e)
continue
}

if isDeletedOrExpired(item.meta, item.ExpiresAt()) {
continue
}
// Found an older version. Mark for deletion
e := &Entry{
Key: y.KeyWithTs(lastKey, item.version),
meta: bitDelete,
}
db.vlog.updateGCStats(item)
curSize := e.estimateSize(db.opt.ValueThreshold)

// Batch up min(1000, maxBatchCount) entries at a time and write
// Ensure that total batch size doesn't exceed maxBatchSize
if count == 1000 || count+1 >= int(db.opt.maxBatchCount) ||
size+curSize >= int(db.opt.maxBatchSize) {
if err := batchSetAsyncIfNoErr(entries); err != nil {
return err
}
count = 0
size = 0
entries = []*Entry{}
}
size += curSize
count++
entries = append(entries, e)
}

// Write last batch pending deletes
Expand Down Expand Up @@ -1050,8 +1104,7 @@ func (db *DB) RunValueLogGC(discardRatio float64) error {
// Find head on disk
headKey := y.KeyWithTs(head, math.MaxUint64)
// Need to pass with timestamp, lsm get removes the last 8 bytes and compares key
var maxVs y.ValueStruct
val, err := db.lc.get(headKey, maxVs)
val, err := db.lc.get(headKey)
if err != nil {
return errors.Wrap(err, "Retrieving head from on-disk LSM")
}
Expand Down
Loading

0 comments on commit a44a56e

Please sign in to comment.