diff --git a/db.go b/db.go index 09fc432cc..e05e22288 100644 --- a/db.go +++ b/db.go @@ -763,6 +763,8 @@ var requestPool = sync.Pool{ } func (db *DB) writeToLSM(b *request) error { + db.lock.RLock() + defer db.lock.RUnlock() for i, entry := range b.Entries { var err error if db.opt.managedTxns || entry.skipVlogAndSetThreshold(db.valueThreshold()) { @@ -1036,10 +1038,9 @@ func (db *DB) HandoverSkiplist(skl *skl.Skiplist, callback func()) error { // Iterate over the skiplist and send the entries to the publisher. it := skl.NewIterator() - it.SeekToFirst() var entries []*Entry - for it.Valid() { + for it.SeekToFirst(); it.Valid(); it.Next() { v := it.Value() e := &Entry{ Key: it.Key(), @@ -1048,7 +1049,6 @@ func (db *DB) HandoverSkiplist(skl *skl.Skiplist, callback func()) error { UserMeta: v.UserMeta, } entries = append(entries, e) - it.Next() } req := &request{ Entries: entries, @@ -1836,6 +1836,122 @@ func (db *DB) dropAll() (func(), error) { return resume, nil } +// DropPrefixNonBlocking would logically drop all the keys with the provided prefix. The data would +// not be cleared from LSM tree immediately. It would be deleted eventually through compactions. +// This operation is useful when we don't want to block writes while we delete the prefixes. +// It does this in the following way: +// - Stream the given prefixes at a given ts. +// - Write them to skiplist at the specified ts and handover that skiplist to DB. +func (db *DB) DropPrefixNonBlocking(prefixes ...[]byte) error { + if db.opt.ReadOnly { + return errors.New("Attempting to drop data in read-only mode.") + } + + if len(prefixes) == 0 { + return nil + } + db.opt.Infof("Non-blocking DropPrefix called for %s", prefixes) + + cbuf := z.NewBuffer(int(db.opt.MemTableSize), "DropPrefixNonBlocking") + defer cbuf.Release() + + var wg sync.WaitGroup + handover := func(force bool) error { + if !force && int64(cbuf.LenNoPadding()) < db.opt.MemTableSize { + return nil + } + + // Sort the kvs, add them to the builder, and hand it over to DB. + cbuf.SortSlice(func(left, right []byte) bool { + return y.CompareKeys(left, right) < 0 + }) + + b := skl.NewBuilder(db.opt.MemTableSize) + err := cbuf.SliceIterate(func(s []byte) error { + b.Add(s, y.ValueStruct{Meta: bitDelete}) + return nil + }) + if err != nil { + return err + } + cbuf.Reset() + wg.Add(1) + return db.HandoverSkiplist(b.Skiplist(), wg.Done) + } + + dropPrefix := func(prefix []byte) error { + stream := db.NewStreamAt(math.MaxUint64) + stream.LogPrefix = fmt.Sprintf("Dropping prefix: %#x", prefix) + stream.Prefix = prefix + // We don't need anything except key and version. + stream.KeyToList = func(key []byte, itr *Iterator) (*pb.KVList, error) { + if !itr.Valid() { + return nil, nil + } + item := itr.Item() + if item.IsDeletedOrExpired() { + return nil, nil + } + if !bytes.Equal(key, item.Key()) { + // Return on the encounter with another key. + return nil, nil + } + + a := itr.Alloc + ka := a.Copy(key) + list := &pb.KVList{} + // We need to generate only a single delete marker per key. All the versions for this + // key will be considered deleted, if we delete the one at highest version. + kv := y.NewKV(a) + kv.Key = y.KeyWithTs(ka, item.Version()) + list.Kv = append(list.Kv, kv) + itr.Next() + return list, nil + } + + stream.Send = func(buf *z.Buffer) error { + kv := pb.KV{} + err := buf.SliceIterate(func(s []byte) error { + kv.Reset() + if err := kv.Unmarshal(s); err != nil { + return err + } + cbuf.WriteSlice(kv.Key) + return nil + }) + if err != nil { + return err + } + return handover(false) + } + if err := stream.Orchestrate(context.Background()); err != nil { + return err + } + // Flush the remaining skiplists if any. + return handover(true) + } + + // Iterate over all the prefixes and logically drop them. + for _, prefix := range prefixes { + if err := dropPrefix(prefix); err != nil { + return errors.Wrapf(err, "While dropping prefix: %#x", prefix) + } + } + + wg.Wait() + return nil +} + +// DropPrefix would drop all the keys with the provided prefix. Based on DB options, it either drops +// the prefixes by blocking the writes or doing a logical drop. +// See DropPrefixBlocking and DropPrefixNonBlocking for more information. +func (db *DB) DropPrefix(prefixes ...[]byte) error { + if db.opt.AllowStopTheWorld { + return db.DropPrefixBlocking(prefixes...) + } + return db.DropPrefixNonBlocking(prefixes...) +} + // DropPrefix would drop all the keys with the provided prefix. It does this in the following way: // - Stop accepting new writes. // - Stop memtable flushes before acquiring lock. Because we're acquring lock here @@ -1847,7 +1963,7 @@ func (db *DB) dropAll() (func(), error) { // - Compact L0->L1, skipping over Kp. // - Compact rest of the levels, Li->Li, picking tables which have Kp. // - Resume memtable flushes, compactions and writes. -func (db *DB) DropPrefix(prefixes ...[]byte) error { +func (db *DB) DropPrefixBlocking(prefixes ...[]byte) error { if len(prefixes) == 0 { return nil } diff --git a/db2_test.go b/db2_test.go index d6c86ec6f..f92bc0963 100644 --- a/db2_test.go +++ b/db2_test.go @@ -31,6 +31,7 @@ import ( "regexp" "runtime" "sync" + "sync/atomic" "testing" "time" @@ -1055,3 +1056,97 @@ func TestKeyCount(t *testing.T) { require.NoError(t, stream.Orchestrate(context.Background())) require.Equal(t, N, uint64(count)) } + +func TestDropPrefixNonBlocking(t *testing.T) { + dir, err := ioutil.TempDir("", "badger-test") + require.NoError(t, err) + defer removeDir(dir) + + db, err := OpenManaged(DefaultOptions(dir).WithAllowStopTheWorld(false)) + require.NoError(t, err) + defer db.Close() + + val := []byte("value") + + // Insert key-values + write := func() { + txn := db.NewTransactionAt(1, true) + defer txn.Discard() + require.NoError(t, txn.Set([]byte("aaa"), val)) + require.NoError(t, txn.Set([]byte("aab"), val)) + require.NoError(t, txn.Set([]byte("aba"), val)) + require.NoError(t, txn.Set([]byte("aca"), val)) + require.NoError(t, txn.CommitAt(2, nil)) + } + + read := func() { + txn := db.NewTransactionAt(6, false) + defer txn.Discard() + iterOpts := DefaultIteratorOptions + iterOpts.Prefix = []byte("aa") + it := txn.NewIterator(iterOpts) + defer it.Close() + + cnt := 0 + for it.Rewind(); it.Valid(); it.Next() { + fmt.Printf("%+v", it.Item()) + cnt++ + } + + require.Equal(t, 0, cnt) + } + + write() + prefixes := [][]byte{[]byte("aa")} + require.NoError(t, db.DropPrefix(prefixes...)) + read() +} + +func TestDropPrefixNonBlockingNoError(t *testing.T) { + dir, err := ioutil.TempDir("", "badger-test") + require.NoError(t, err) + defer removeDir(dir) + + opt := DefaultOptions(dir) + db, err := OpenManaged(opt) + require.NoError(t, err) + defer db.Close() + + clock := uint64(1) + + writer := func(db *DB, shouldFail bool, closer *z.Closer) { + val := []byte("value") + defer closer.Done() + // Insert key-values + for { + select { + case <-closer.HasBeenClosed(): + return + default: + txn := db.NewTransactionAt(atomic.AddUint64(&clock, 1), true) + require.NoError(t, txn.SetEntry(NewEntry([]byte("aaa"), val))) + + err := txn.CommitAt(atomic.AddUint64(&clock, 1), nil) + if shouldFail && err != nil { + require.Error(t, err, ErrBlockedWrites) + } else if !shouldFail { + require.NoError(t, err) + } + txn.Discard() + } + } + } + + closer := z.NewCloser(1) + go writer(db, true, closer) + time.Sleep(time.Millisecond * 100) + require.NoError(t, db.DropPrefixBlocking([]byte("aa"))) + closer.SignalAndWait() + + closer2 := z.NewCloser(1) + go writer(db, false, closer2) + time.Sleep(time.Millisecond * 50) + prefixes := [][]byte{[]byte("aa")} + require.NoError(t, db.DropPrefixNonBlocking(prefixes...)) + closer2.SignalAndWait() +} diff --git a/go.mod b/go.mod index d94c4067e..0e2a361e8 100644 --- a/go.mod +++ b/go.mod @@ -7,7 +7,7 @@ go 1.12 require ( github.com/DataDog/zstd v1.4.6-0.20210216161059-8cb8bacba7ba github.com/cespare/xxhash v1.1.0 - github.com/dgraph-io/ristretto v0.0.4-0.20210309073149-3836124cdc5a + github.com/dgraph-io/ristretto v0.0.4-0.20210504190834-0bf2acd73aa3 github.com/dustin/go-humanize v1.0.0 github.com/gogo/protobuf v1.3.2 github.com/golang/protobuf v1.3.1 diff --git a/go.sum b/go.sum index f14b4a123..e97ba7656 100644 --- a/go.sum +++ b/go.sum @@ -7,6 +7,8 @@ github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAE github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8= github.com/cespare/xxhash v1.1.0 h1:a6HrQnmkObjyL+Gs60czilIUGqrzKutQD6XZog3p+ko= github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc= +github.com/cespare/xxhash/v2 v2.1.1 h1:6MnRN8NT7+YBpUIWxHtefFZOKTAPgGjpQSxqLNn0+qY= +github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= github.com/coreos/etcd v3.3.10+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc32PjwdhPthX9715RE= github.com/coreos/go-etcd v2.0.0+incompatible/go.mod h1:Jez6KQU2B/sWsbdaef3ED8NzMklzPG4d5KIOhIy30Tk= @@ -15,8 +17,8 @@ github.com/cpuguy83/go-md2man v1.0.10/go.mod h1:SmD6nW6nTyfqj6ABTjUi3V3JVMnlJmwc github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/dgraph-io/ristretto v0.0.4-0.20210309073149-3836124cdc5a h1:1cMMkx3iegOzbAxVl1ZZQRHk+gaCf33Y5/4I3l0NNSg= -github.com/dgraph-io/ristretto v0.0.4-0.20210309073149-3836124cdc5a/go.mod h1:MIonLggsKgZLUSt414ExgwNtlOL5MuEoAJP514mwGe8= +github.com/dgraph-io/ristretto v0.0.4-0.20210504190834-0bf2acd73aa3 h1:jU/wpYsEL+8JPLf/QcjkQKI5g0dOjSuwcMjkThxt5x0= +github.com/dgraph-io/ristretto v0.0.4-0.20210504190834-0bf2acd73aa3/go.mod h1:fux0lOrBhrVCJd3lcTHsIJhq1T2rokOu6v9Vcb3Q9ug= github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2 h1:tdlZCpZ/P9DhczCTSixgIKmwPv6+wP5DGjqLYw5SUiA= github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2/go.mod h1:SqUrOPUnsFjfmXRMNPybcSiG0BgUW2AuFH8PAnS2iTw= github.com/dustin/go-humanize v1.0.0 h1:VSnTsYCnlFHaM2/igO1h6X3HA71jcobQuxemgkq4zYo= diff --git a/options.go b/options.go index 3f9ba3395..0f6c6def7 100644 --- a/options.go +++ b/options.go @@ -104,6 +104,9 @@ type Options struct { // ChecksumVerificationMode decides when db should verify checksums for SSTable blocks. ChecksumVerificationMode options.ChecksumVerificationMode + // AllowStopTheWorld determines whether the DropPrefix will be blocking/non-blocking. + AllowStopTheWorld bool + // DetectConflicts determines whether the transactions would be checked for // conflicts. The transactions can be processed at a higher rate when // conflict detection is disabled. @@ -140,6 +143,7 @@ func DefaultOptions(path string) Options { MaxLevels: 7, NumGoroutines: 8, MetricsEnabled: true, + AllowStopTheWorld: true, NumCompactors: 4, // Run at least 2 compactors. Zero-th compactor prioritizes L0. NumLevelZeroTables: 5, @@ -674,6 +678,20 @@ func (opt Options) WithChecksumVerificationMode(cvMode options.ChecksumVerificat return opt } +// WithAllowStopTheWorld returns a new Options value with AllowStopTheWorld set to the given value. +// +// AllowStopTheWorld indicates whether the call to DropPrefix should block the writes or not. +// When set to false, the DropPrefix will do a logical delete and will not block +// the writes. Although, this will not immediately clear up the LSM tree. +// When set to false, the DropPrefix will block the writes and will clear up the LSM +// tree. +// +// The default value of AllowStopTheWorld is true. +func (opt Options) WithAllowStopTheWorld(b bool) Options { + opt.AllowStopTheWorld = b + return opt +} + // WithBlockCacheSize returns a new Options value with BlockCacheSize set to the given value. // // This value specifies how much data cache should hold in memory. A small size diff --git a/table/builder.go b/table/builder.go index 8322bb86f..55166cd11 100644 --- a/table/builder.go +++ b/table/builder.go @@ -153,6 +153,16 @@ func NewTableBuilder(opts Options) *Builder { return b } +func maxEncodedLen(ctype options.CompressionType, sz int) int { + switch ctype { + case options.Snappy: + return snappy.MaxEncodedLen(sz) + case options.ZSTD: + return y.ZSTDCompressBound(sz) + } + return sz +} + func (b *Builder) handleBlock() { defer b.wg.Done() @@ -175,7 +185,7 @@ func (b *Builder) handleBlock() { // BlockBuf should always less than or equal to allocated space. If the blockBuf is greater // than allocated space that means the data from this block cannot be stored in its // existing location. - allocatedSpace := (item.end) + padding + 1 + allocatedSpace := maxEncodedLen(b.opts.Compression, (item.end)) + padding + 1 y.AssertTrue(len(blockBuf) <= allocatedSpace) // blockBuf was allocated on allocator. So, we don't need to copy it over.