Skip to content

Commit

Permalink
Revert "feat(dropPrefix): add DropPrefixNonBlocking API (#1698)"
Browse files Browse the repository at this point in the history
This reverts commit da5f789.
  • Loading branch information
joshua-goldstein committed Jan 5, 2023
1 parent ff5c908 commit 36a27fb
Show file tree
Hide file tree
Showing 5 changed files with 2 additions and 245 deletions.
118 changes: 1 addition & 117 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -1720,122 +1720,6 @@ func (db *DB) dropAll() (func(), error) {
return resume, nil
}

// DropPrefixNonBlocking would logically drop all the keys with the provided prefix. The data would
// not be cleared from LSM tree immediately. It would be deleted eventually through compactions.
// This operation is useful when we don't want to block writes while we delete the prefixes.
// It does this in the following way:
// - Stream the given prefixes at a given ts.
// - Write them to skiplist at the specified ts and handover that skiplist to DB.
func (db *DB) DropPrefixNonBlocking(prefixes ...[]byte) error {
if db.opt.ReadOnly {
return errors.New("Attempting to drop data in read-only mode.")
}

if len(prefixes) == 0 {
return nil
}
db.opt.Infof("Non-blocking DropPrefix called for %s", prefixes)

cbuf := z.NewBuffer(int(db.opt.MemTableSize), "DropPrefixNonBlocking")
defer cbuf.Release()

var wg sync.WaitGroup
handover := func(force bool) error {
if !force && int64(cbuf.LenNoPadding()) < db.opt.MemTableSize {
return nil
}

// Sort the kvs, add them to the builder, and hand it over to DB.
cbuf.SortSlice(func(left, right []byte) bool {
return y.CompareKeys(left, right) < 0
})

b := skl.NewBuilder(db.opt.MemTableSize)
err := cbuf.SliceIterate(func(s []byte) error {
b.Add(s, y.ValueStruct{Meta: bitDelete})
return nil
})
if err != nil {
return err
}
cbuf.Reset()
wg.Add(1)
return db.HandoverSkiplist(b.Skiplist(), wg.Done)
}

dropPrefix := func(prefix []byte) error {
stream := db.NewStreamAt(math.MaxUint64)
stream.LogPrefix = fmt.Sprintf("Dropping prefix: %#x", prefix)
stream.Prefix = prefix
// We don't need anything except key and version.
stream.KeyToList = func(key []byte, itr *Iterator) (*pb.KVList, error) {
if !itr.Valid() {
return nil, nil
}
item := itr.Item()
if item.IsDeletedOrExpired() {
return nil, nil
}
if !bytes.Equal(key, item.Key()) {
// Return on the encounter with another key.
return nil, nil
}

a := itr.Alloc
ka := a.Copy(key)
list := &pb.KVList{}
// We need to generate only a single delete marker per key. All the versions for this
// key will be considered deleted, if we delete the one at highest version.
kv := y.NewKV(a)
kv.Key = y.KeyWithTs(ka, item.Version())
list.Kv = append(list.Kv, kv)
itr.Next()
return list, nil
}

stream.Send = func(buf *z.Buffer) error {
kv := pb.KV{}
err := buf.SliceIterate(func(s []byte) error {
kv.Reset()
if err := kv.Unmarshal(s); err != nil {
return err
}
cbuf.WriteSlice(kv.Key)
return nil
})
if err != nil {
return err
}
return handover(false)
}
if err := stream.Orchestrate(context.Background()); err != nil {
return err
}
// Flush the remaining skiplists if any.
return handover(true)
}

// Iterate over all the prefixes and logically drop them.
for _, prefix := range prefixes {
if err := dropPrefix(prefix); err != nil {
return errors.Wrapf(err, "While dropping prefix: %#x", prefix)
}
}

wg.Wait()
return nil
}

// DropPrefix would drop all the keys with the provided prefix. Based on DB options, it either drops
// the prefixes by blocking the writes or doing a logical drop.
// See DropPrefixBlocking and DropPrefixNonBlocking for more information.
func (db *DB) DropPrefix(prefixes ...[]byte) error {
if db.opt.AllowStopTheWorld {
return db.DropPrefixBlocking(prefixes...)
}
return db.DropPrefixNonBlocking(prefixes...)
}

// DropPrefix would drop all the keys with the provided prefix. It does this in the following way:
// - Stop accepting new writes.
// - Stop memtable flushes before acquiring lock. Because we're acquring lock here
Expand All @@ -1847,7 +1731,7 @@ func (db *DB) DropPrefix(prefixes ...[]byte) error {
// - Compact L0->L1, skipping over Kp.
// - Compact rest of the levels, Li->Li, picking tables which have Kp.
// - Resume memtable flushes, compactions and writes.
func (db *DB) DropPrefixBlocking(prefixes ...[]byte) error {
func (db *DB) DropPrefix(prefixes ...[]byte) error {
if len(prefixes) == 0 {
return nil
}
Expand Down
95 changes: 0 additions & 95 deletions db2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ import (
"regexp"
"runtime"
"sync"
"sync/atomic"
"testing"
"time"

Expand Down Expand Up @@ -1056,97 +1055,3 @@ func TestKeyCount(t *testing.T) {
require.NoError(t, stream.Orchestrate(context.Background()))
require.Equal(t, N, uint64(count))
}

func TestDropPrefixNonBlocking(t *testing.T) {
dir, err := ioutil.TempDir("", "badger-test")
require.NoError(t, err)
defer removeDir(dir)

db, err := OpenManaged(DefaultOptions(dir).WithAllowStopTheWorld(false))
require.NoError(t, err)
defer db.Close()

val := []byte("value")

// Insert key-values
write := func() {
txn := db.NewTransactionAt(1, true)
defer txn.Discard()
require.NoError(t, txn.Set([]byte("aaa"), val))
require.NoError(t, txn.Set([]byte("aab"), val))
require.NoError(t, txn.Set([]byte("aba"), val))
require.NoError(t, txn.Set([]byte("aca"), val))
require.NoError(t, txn.CommitAt(2, nil))
}

read := func() {
txn := db.NewTransactionAt(6, false)
defer txn.Discard()
iterOpts := DefaultIteratorOptions
iterOpts.Prefix = []byte("aa")
it := txn.NewIterator(iterOpts)
defer it.Close()

cnt := 0
for it.Rewind(); it.Valid(); it.Next() {
fmt.Printf("%+v", it.Item())
cnt++
}

require.Equal(t, 0, cnt)
}

write()
prefixes := [][]byte{[]byte("aa")}
require.NoError(t, db.DropPrefix(prefixes...))
read()
}

func TestDropPrefixNonBlockingNoError(t *testing.T) {
dir, err := ioutil.TempDir("", "badger-test")
require.NoError(t, err)
defer removeDir(dir)

opt := DefaultOptions(dir)
db, err := OpenManaged(opt)
require.NoError(t, err)
defer db.Close()

clock := uint64(1)

writer := func(db *DB, shouldFail bool, closer *z.Closer) {
val := []byte("value")
defer closer.Done()
// Insert key-values
for {
select {
case <-closer.HasBeenClosed():
return
default:
txn := db.NewTransactionAt(atomic.AddUint64(&clock, 1), true)
require.NoError(t, txn.SetEntry(NewEntry([]byte("aaa"), val)))

err := txn.CommitAt(atomic.AddUint64(&clock, 1), nil)
if shouldFail && err != nil {
require.Error(t, err, ErrBlockedWrites)
} else if !shouldFail {
require.NoError(t, err)
}
txn.Discard()
}
}
}

closer := z.NewCloser(1)
go writer(db, true, closer)
time.Sleep(time.Millisecond * 100)
require.NoError(t, db.DropPrefixBlocking([]byte("aa")))
closer.SignalAndWait()

closer2 := z.NewCloser(1)
go writer(db, false, closer2)
time.Sleep(time.Millisecond * 50)
prefixes := [][]byte{[]byte("aa")}
require.NoError(t, db.DropPrefixNonBlocking(prefixes...))
closer2.SignalAndWait()
}
4 changes: 0 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@ github.com/cpuguy83/go-md2man v1.0.10/go.mod h1:SmD6nW6nTyfqj6ABTjUi3V3JVMnlJmwc
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dgraph-io/ristretto v0.1.0 h1:Jv3CGQHp9OjuMBSne1485aDpUkTKEcUqF+jm/LuerPI=
github.com/dgraph-io/ristretto v0.1.0/go.mod h1:fux0lOrBhrVCJd3lcTHsIJhq1T2rokOu6v9Vcb3Q9ug=
github.com/dgraph-io/ristretto v0.1.1 h1:6CWw5tJNgpegArSHpNHJKldNeq03FQCwYvfMVWajOK8=
github.com/dgraph-io/ristretto v0.1.1/go.mod h1:S1GPSBCYCIhmVNfcth17y2zZtQT6wzkzgwUve0VDWWA=
github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2 h1:tdlZCpZ/P9DhczCTSixgIKmwPv6+wP5DGjqLYw5SUiA=
Expand Down Expand Up @@ -115,8 +113,6 @@ golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5h
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190502145724-3ef323f4f1fd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210124154548-22da62e12c0c h1:VwygUrnw9jn88c4u8GD3rZQbqrP/tgas88tPUbBxQrk=
golang.org/x/sys v0.0.0-20210124154548-22da62e12c0c/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20221010170243-090e33056c14 h1:k5II8e6QD8mITdi+okbbmR/cIyEbeXLBhy5Ha4nevyc=
golang.org/x/sys v0.0.0-20221010170243-090e33056c14/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
Expand Down
18 changes: 0 additions & 18 deletions options.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,9 +104,6 @@ type Options struct {
// ChecksumVerificationMode decides when db should verify checksums for SSTable blocks.
ChecksumVerificationMode options.ChecksumVerificationMode

// AllowStopTheWorld determines whether the DropPrefix will be blocking/non-blocking.
AllowStopTheWorld bool

// DetectConflicts determines whether the transactions would be checked for
// conflicts. The transactions can be processed at a higher rate when
// conflict detection is disabled.
Expand Down Expand Up @@ -147,7 +144,6 @@ func DefaultOptions(path string) Options {
MaxLevels: 7,
NumGoroutines: 8,
MetricsEnabled: true,
AllowStopTheWorld: true,

NumCompactors: 4, // Run at least 2 compactors. Zero-th compactor prioritizes L0.
NumLevelZeroTables: 5,
Expand Down Expand Up @@ -683,20 +679,6 @@ func (opt Options) WithChecksumVerificationMode(cvMode options.ChecksumVerificat
return opt
}

// WithAllowStopTheWorld returns a new Options value with AllowStopTheWorld set to the given value.
//
// AllowStopTheWorld indicates whether the call to DropPrefix should block the writes or not.
// When set to false, the DropPrefix will do a logical delete and will not block
// the writes. Although, this will not immediately clear up the LSM tree.
// When set to false, the DropPrefix will block the writes and will clear up the LSM
// tree.
//
// The default value of AllowStopTheWorld is true.
func (opt Options) WithAllowStopTheWorld(b bool) Options {
opt.AllowStopTheWorld = b
return opt
}

// WithBlockCacheSize returns a new Options value with BlockCacheSize set to the given value.
//
// This value specifies how much data cache should hold in memory. A small size
Expand Down
12 changes: 1 addition & 11 deletions table/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,16 +157,6 @@ func NewTableBuilder(opts Options) *Builder {
return b
}

func maxEncodedLen(ctype options.CompressionType, sz int) int {
switch ctype {
case options.Snappy:
return snappy.MaxEncodedLen(sz)
case options.ZSTD:
return y.ZSTDCompressBound(sz)
}
return sz
}

func (b *Builder) handleBlock() {
defer b.wg.Done()

Expand All @@ -189,7 +179,7 @@ func (b *Builder) handleBlock() {
// BlockBuf should always less than or equal to allocated space. If the blockBuf is greater
// than allocated space that means the data from this block cannot be stored in its
// existing location.
allocatedSpace := maxEncodedLen(b.opts.Compression, (item.end)) + padding + 1
allocatedSpace := (item.end) + padding + 1
y.AssertTrue(len(blockBuf) <= allocatedSpace)

// blockBuf was allocated on allocator. So, we don't need to copy it over.
Expand Down

0 comments on commit 36a27fb

Please sign in to comment.