Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(dropPrefix): add DropPrefixNonBlocking API #1698

Merged
merged 7 commits into from
May 4, 2021
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
113 changes: 110 additions & 3 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -763,6 +763,8 @@ var requestPool = sync.Pool{
}

func (db *DB) writeToLSM(b *request) error {
db.lock.RLock()
defer db.lock.RUnlock()
for i, entry := range b.Entries {
var err error
if db.opt.managedTxns || entry.skipVlogAndSetThreshold(db.valueThreshold()) {
Expand Down Expand Up @@ -1036,10 +1038,9 @@ func (db *DB) HandoverSkiplist(skl *skl.Skiplist, callback func()) error {

// Iterate over the skiplist and send the entries to the publisher.
it := skl.NewIterator()
it.SeekToFirst()

var entries []*Entry
for it.Valid() {
for it.SeekToFirst(); it.Valid(); it.Next() {
v := it.Value()
e := &Entry{
Key: it.Key(),
Expand All @@ -1048,7 +1049,6 @@ func (db *DB) HandoverSkiplist(skl *skl.Skiplist, callback func()) error {
UserMeta: v.UserMeta,
}
entries = append(entries, e)
it.Next()
}
req := &request{
Entries: entries,
Expand Down Expand Up @@ -1836,6 +1836,113 @@ func (db *DB) dropAll() (func(), error) {
return resume, nil
}

// DropPrefixNonBlocking would logically drop all the keys with the provided prefix. The data would
// not be cleared from LSM tree immediately. It would be deleted eventually through compactions.
// This operation is useful when we don't want to block writes while we delete the prefixes.
// It does this in the following way:
// - Stream the given prefixes at a given ts.
// - Write them to skiplist at the specified ts and handover that skiplist to DB.
func (db *DB) DropPrefixNonBlocking(ts uint64, prefixes ...[]byte) error {
if db.opt.ReadOnly {
return errors.New("Attempting to drop data in read-only mode.")
}

if len(prefixes) == 0 {
return nil
}
db.opt.Infof("Non-blocking DropPrefix called for %s", prefixes)

dropPrefix := func(prefix []byte) error {
stream := db.NewStreamAt(ts)
stream.LogPrefix = fmt.Sprintf("Dropping prefix: %#x", prefix)
stream.Prefix = prefix
// Use the default implementation with some changes. We don't need anything except key.
stream.KeyToList = func(key []byte, itr *Iterator) (*pb.KVList, error) {
a := itr.Alloc
ka := a.Copy(key)

list := &pb.KVList{}
for ; itr.Valid(); itr.Next() {
item := itr.Item()
if item.IsDeletedOrExpired() {
break
}
if !bytes.Equal(key, item.Key()) {
// Break out on the first encounter with another key.
break
}

kv := y.NewKV(a)
kv.Key = ka
list.Kv = append(list.Kv, kv)

if db.opt.NumVersionsToKeep == 1 {
break
}

if item.DiscardEarlierVersions() {
break
}
}
return list, nil
}

var wg sync.WaitGroup
builderMap := make(map[uint32]*skl.Builder)
initSize := int64(float64(db.opt.MemTableSize) * 1.1)

handover := func(force bool) error {
for id, b := range builderMap {
sl := b.Skiplist()
if force || sl.MemSize() > db.opt.MemTableSize {
wg.Add(1)
if err := db.HandoverSkiplist(sl, wg.Done); err != nil {
return err
}
// Create a fresh builder.
builderMap[id] = skl.NewBuilder(initSize)
}
}
return nil
}

stream.Send = func(buf *z.Buffer) error {
err := buf.SliceIterate(func(s []byte) error {
var kv pb.KV
if err := kv.Unmarshal(s); err != nil {
return err
}
if _, ok := builderMap[kv.StreamId]; !ok {
builderMap[kv.StreamId] = skl.NewBuilder(initSize)
}
builderMap[kv.StreamId].Add(y.KeyWithTs(kv.Key, ts), y.ValueStruct{Meta: bitDelete})
return nil
})
if err != nil {
return err
}
return handover(false)
}
if err := stream.Orchestrate(context.Background()); err != nil {
return err
}
// Flush the remaining skiplists if any.
if err := handover(true); err != nil {
return err
}
wg.Wait()
NamanJain8 marked this conversation as resolved.
Show resolved Hide resolved
return nil
}

// Iterate over all the prefixes and logically drop them.
for _, prefix := range prefixes {
if err := dropPrefix(prefix); err != nil {
return errors.Wrapf(err, "While dropping prefix: %#x", prefix)
}
}
return nil
}

// DropPrefix would drop all the keys with the provided prefix. It does this in the following way:
// - Stop accepting new writes.
// - Stop memtable flushes before acquiring lock. Because we're acquring lock here
Expand Down
97 changes: 97 additions & 0 deletions db2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"regexp"
"runtime"
"sync"
"sync/atomic"
"testing"
"time"

Expand Down Expand Up @@ -1055,3 +1056,99 @@ func TestKeyCount(t *testing.T) {
require.NoError(t, stream.Orchestrate(context.Background()))
require.Equal(t, N, uint64(count))
}

func TestDropPrefixNonBlocking(t *testing.T) {
dir, err := ioutil.TempDir("", "badger-test")
require.NoError(t, err)
defer removeDir(dir)

db, err := OpenManaged(DefaultOptions(dir))
require.NoError(t, err)
defer db.Close()

val := []byte("value")

// Insert key-values
write := func() {
txn := db.NewTransactionAt(1, true)
defer txn.Discard()
require.NoError(t, txn.Set([]byte("aaa"), val))
require.NoError(t, txn.Set([]byte("aab"), val))
require.NoError(t, txn.Set([]byte("aba"), val))
require.NoError(t, txn.Set([]byte("aca"), val))
require.NoError(t, txn.CommitAt(2, nil))
}

read := func() {
txn := db.NewTransactionAt(6, false)
iterOpts := DefaultIteratorOptions
iterOpts.Prefix = []byte("aa")
it := txn.NewIterator(iterOpts)
defer it.Close()

cnt := 0
for it.Rewind(); it.Valid(); it.Next() {
cnt++
}

require.Equal(t, 0, cnt)
}

write()
prefixes := [][]byte{[]byte("aa")}
require.NoError(t, db.DropPrefixNonBlocking(5, prefixes...))
read()

// Writing again at same timestamp and verifying that vlog rewrites don't allow us to read
// these entries anyway.
write()
read()
}

func TestDropPrefixNonBlockingNoError(t *testing.T) {
dir, err := ioutil.TempDir("", "badger-test")
require.NoError(t, err)
defer removeDir(dir)

opt := DefaultOptions(dir)
db, err := OpenManaged(opt)
require.NoError(t, err)
defer db.Close()

clock := uint64(1)

writer := func(db *DB, shouldFail bool, closer *z.Closer) {
val := []byte("value")
defer closer.Done()
// Insert key-values
for {
select {
case <-closer.HasBeenClosed():
return
default:
txn := db.NewTransactionAt(atomic.AddUint64(&clock, 1), true)
require.NoError(t, txn.SetEntry(NewEntry([]byte("aaa"), val)))

err := txn.CommitAt(atomic.AddUint64(&clock, 1), nil)
if shouldFail && err != nil {
require.Error(t, err, ErrBlockedWrites)
} else if !shouldFail {
require.NoError(t, err)
}
}
}
}

closer := z.NewCloser(1)
go writer(db, true, closer)
time.Sleep(time.Millisecond * 100)
require.NoError(t, db.DropPrefix([]byte("aa")))
closer.SignalAndWait()

closer2 := z.NewCloser(1)
go writer(db, false, closer2)
time.Sleep(time.Millisecond * 50)
prefixes := [][]byte{[]byte("aa")}
require.NoError(t, db.DropPrefixNonBlocking(atomic.AddUint64(&clock, 1), prefixes...))
closer2.SignalAndWait()
}
12 changes: 11 additions & 1 deletion table/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,16 @@ func NewTableBuilder(opts Options) *Builder {
return b
}

func maxEncodedLen(ctype options.CompressionType, sz int) int {
switch ctype {
case options.Snappy:
return snappy.MaxEncodedLen(sz)
case options.ZSTD:
return y.ZSTDCompressBound(sz)
}
return sz
}

func (b *Builder) handleBlock() {
defer b.wg.Done()

Expand All @@ -175,7 +185,7 @@ func (b *Builder) handleBlock() {
// BlockBuf should always less than or equal to allocated space. If the blockBuf is greater
// than allocated space that means the data from this block cannot be stored in its
// existing location.
allocatedSpace := (item.end) + padding + 1
allocatedSpace := maxEncodedLen(b.opts.Compression, (item.end)) + padding + 1
y.AssertTrue(len(blockBuf) <= allocatedSpace)

// blockBuf was allocated on allocator. So, we don't need to copy it over.
Expand Down