From aa1b655e7fb7d2176ab512f3b8b56065e4cdaf91 Mon Sep 17 00:00:00 2001 From: Manish R Jain Date: Mon, 26 Nov 2018 13:11:22 -0800 Subject: [PATCH 1/2] Bring in Badger's prefix change during iterator creation. --- posting/index.go | 6 +- posting/mvcc.go | 1 + raftwal/storage.go | 22 +-- stream/stream.go | 65 ++++--- vendor/github.com/dgraph-io/badger/README.md | 2 +- vendor/github.com/dgraph-io/badger/backup.go | 45 ++++- vendor/github.com/dgraph-io/badger/db.go | 16 ++ .../github.com/dgraph-io/badger/iterator.go | 46 +++-- .../dgraph-io/badger/level_handler.go | 16 +- vendor/github.com/dgraph-io/badger/levels.go | 4 +- .../dgraph-io/badger/protos/backup.pb.go | 165 ++++++++++++------ .../dgraph-io/badger/protos/backup.proto | 5 +- .../dgraph-io/badger/protos/manifest.pb.go | 136 +++++++++++---- vendor/vendor.json | 38 ++-- worker/mutation.go | 11 +- worker/sort.go | 22 ++- worker/task.go | 16 +- worker/tokens.go | 19 +- 18 files changed, 419 insertions(+), 216 deletions(-) diff --git a/posting/index.go b/posting/index.go index 48a704639ae..0b4c77ae8be 100644 --- a/posting/index.go +++ b/posting/index.go @@ -394,13 +394,14 @@ func (l *List) AddMutationWithIndex(ctx context.Context, t *pb.DirectedEdge, func deleteEntries(prefix []byte, remove func(key []byte) bool) error { return pstore.View(func(txn *badger.Txn) error { opt := badger.DefaultIteratorOptions + opt.Prefix = prefix opt.PrefetchValues = false itr := txn.NewIterator(opt) defer itr.Close() writer := x.NewTxnWriter(pstore) - for itr.Seek(prefix); itr.ValidForPrefix(prefix); itr.Next() { + for itr.Rewind(); itr.Valid(); itr.Next() { item := itr.Item() if !remove(item.Key()) { continue @@ -490,6 +491,7 @@ func (r *rebuild) Run(ctx context.Context) error { r.startTs, hex.Dump(r.prefix)) opts := badger.DefaultIteratorOptions opts.AllVersions = true + opts.Prefix = r.prefix it := t.NewIterator(opts) defer it.Close() @@ -516,7 +518,7 @@ func (r *rebuild) Run(ctx context.Context) error { } var prevKey []byte - for it.Seek(r.prefix); it.ValidForPrefix(r.prefix); { + for it.Rewind(); it.Valid(); { item := it.Item() if bytes.Equal(item.Key(), prevKey) { it.Next() diff --git a/posting/mvcc.go b/posting/mvcc.go index 2eab5c1119c..572aa4077aa 100644 --- a/posting/mvcc.go +++ b/posting/mvcc.go @@ -238,6 +238,7 @@ func getNew(key []byte, pstore *badger.DB) (*List, error) { l.minTs = item.Version() } else { iterOpts := badger.DefaultIteratorOptions + iterOpts.Prefix = key iterOpts.AllVersions = true it := txn.NewIterator(iterOpts) defer it.Close() diff --git a/raftwal/storage.go b/raftwal/storage.go index e6a086a102a..7d76ed26c10 100644 --- a/raftwal/storage.go +++ b/raftwal/storage.go @@ -191,12 +191,13 @@ func (w *DiskStorage) seekEntry(e *pb.Entry, seekTo uint64, reverse bool) (uint6 err := w.db.View(func(txn *badger.Txn) error { opt := badger.DefaultIteratorOptions opt.PrefetchValues = false + opt.Prefix = w.entryPrefix() opt.Reverse = reverse itr := txn.NewIterator(opt) defer itr.Close() itr.Seek(w.entryKey(seekTo)) - if !itr.ValidForPrefix(w.entryPrefix()) { + if !itr.Valid() { return errNotFound } item := itr.Item() @@ -246,14 +247,14 @@ func (w *DiskStorage) deleteUntil(batch *badger.WriteBatch, until uint64) error err := w.db.View(func(txn *badger.Txn) error { opt := badger.DefaultIteratorOptions opt.PrefetchValues = false + opt.Prefix = w.entryPrefix() itr := txn.NewIterator(opt) defer itr.Close() start := w.entryKey(0) - prefix := w.entryPrefix() first := true var index uint64 - for itr.Seek(start); itr.ValidForPrefix(prefix); itr.Next() { + for itr.Seek(start); itr.Valid(); itr.Next() { item := itr.Item() index = w.parseIndex(item.Key()) if first { @@ -380,13 +381,13 @@ func (w *DiskStorage) deleteFrom(batch *badger.WriteBatch, from uint64) error { var keys []string err := w.db.View(func(txn *badger.Txn) error { start := w.entryKey(from) - prefix := w.entryPrefix() opt := badger.DefaultIteratorOptions opt.PrefetchValues = false + opt.Prefix = w.entryPrefix() itr := txn.NewIterator(opt) defer itr.Close() - for itr.Seek(start); itr.ValidForPrefix(prefix); itr.Next() { + for itr.Seek(start); itr.Valid(); itr.Next() { key := itr.Item().Key() keys = append(keys, string(key)) } @@ -437,12 +438,12 @@ func (w *DiskStorage) NumEntries() (int, error) { err := w.db.View(func(txn *badger.Txn) error { opt := badger.DefaultIteratorOptions opt.PrefetchValues = false + opt.Prefix = w.entryPrefix() itr := txn.NewIterator(opt) defer itr.Close() start := w.entryKey(0) - prefix := w.entryPrefix() - for itr.Seek(start); itr.ValidForPrefix(prefix); itr.Next() { + for itr.Seek(start); itr.Valid(); itr.Next() { count++ } return nil @@ -467,16 +468,17 @@ func (w *DiskStorage) allEntries(lo, hi, maxSize uint64) (es []pb.Entry, rerr er }) } - itr := txn.NewIterator(badger.DefaultIteratorOptions) + iopt := badger.DefaultIteratorOptions + iopt.Prefix = w.entryPrefix() + itr := txn.NewIterator(iopt) defer itr.Close() start := w.entryKey(lo) end := w.entryKey(hi) // Not included in results. - prefix := w.entryPrefix() var size, lastIndex uint64 first := true - for itr.Seek(start); itr.ValidForPrefix(prefix); itr.Next() { + for itr.Seek(start); itr.Valid(); itr.Next() { item := itr.Item() var e pb.Entry if err := item.Value(func(val []byte) error { diff --git a/stream/stream.go b/stream/stream.go index f51e9b0f7b3..d6626c57186 100644 --- a/stream/stream.go +++ b/stream/stream.go @@ -22,6 +22,7 @@ import ( "time" "github.com/dgraph-io/badger" + "github.com/dgraph-io/badger/y" "github.com/dgraph-io/dgraph/protos/pb" "github.com/dgraph-io/dgraph/x" @@ -51,10 +52,12 @@ type keyRange struct { end []byte } -func (sl *Lists) Orchestrate(ctx context.Context, prefix string, ts uint64) error { - keyCh := make(chan keyRange, 100) // Contains keys for posting lists. - kvChan := make(chan *pb.KVS, 100) // Contains marshaled posting lists. - errCh := make(chan error, 1) // Stores error by consumeKeys. +func (sl *Lists) Orchestrate(ctx context.Context, logPrefix string, ts uint64) error { + keyCh := make(chan keyRange, 3) // Contains keys for posting lists. + // kvChan should only have a small capacity to ensure that we don't buffer up too much data, if + // sending is slow. So, setting this to 3. + kvChan := make(chan *pb.KVS, 3) // Contains marshaled posting lists. + errCh := make(chan error, 1) // Stores error by consumeKeys. // Read the predicate keys and stream to keysCh. go sl.produceRanges(ctx, ts, keyCh) @@ -77,7 +80,7 @@ func (sl *Lists) Orchestrate(ctx context.Context, prefix string, ts uint64) erro // Pick up key-values from kvChan and send to stream. kvErr := make(chan error, 1) go func() { - kvErr <- sl.streamKVs(ctx, prefix, kvChan) + kvErr <- sl.streamKVs(ctx, logPrefix, kvChan) }() wg.Wait() // Wait for produceKVs to be over. close(kvChan) // Now we can close kvChan. @@ -100,32 +103,15 @@ func (sl *Lists) produceRanges(ctx context.Context, ts uint64, keyCh chan keyRan if len(sl.Predicate) > 0 { prefix = x.PredicatePrefix(sl.Predicate) } - txn := sl.DB.NewTransactionAt(ts, false) - defer txn.Discard() - iterOpts := badger.DefaultIteratorOptions - iterOpts.PrefetchValues = false - it := txn.NewIterator(iterOpts) - defer it.Close() - - var start []byte - var size int64 - for it.Seek(prefix); it.ValidForPrefix(prefix); it.Next() { - item := it.Item() - if len(start) == 0 { - start = item.KeyCopy(nil) - } - - size += item.EstimatedSize() - if size > pageSize { - kr := keyRange{start: start, end: item.KeyCopy(nil)} - keyCh <- kr - start = item.KeyCopy(nil) - size = 0 - } - } - if len(start) > 0 { - keyCh <- keyRange{start: start} + splits := sl.DB.KeySplits(prefix) + start := prefix + for _, key := range splits { + keyCh <- keyRange{start: start, end: y.SafeCopy(nil, []byte(key))} + start = y.SafeCopy(nil, []byte(key)) } + // Edge case: prefix is empty and no splits exist. In that case, we should have at least one + // keyRange output. + keyCh <- keyRange{start: start} close(keyCh) } @@ -136,18 +122,21 @@ func (sl *Lists) produceKVs(ctx context.Context, ts uint64, prefix = x.PredicatePrefix(sl.Predicate) } + var size int txn := sl.DB.NewTransactionAt(ts, false) defer txn.Discard() iterate := func(kr keyRange) error { iterOpts := badger.DefaultIteratorOptions iterOpts.AllVersions = true + iterOpts.Prefix = prefix iterOpts.PrefetchValues = false it := txn.NewIterator(iterOpts) defer it.Close() kvs := new(pb.KVS) var prevKey []byte - for it.Seek(kr.start); it.ValidForPrefix(prefix); { + for it.Seek(kr.start); it.Valid(); { + // it.Valid would only return true for keys with the provided Prefix in iterOpts. item := it.Item() if bytes.Equal(item.Key(), prevKey) { it.Next() @@ -171,6 +160,12 @@ func (sl *Lists) produceKVs(ctx context.Context, ts uint64, } if kv != nil { kvs.Kv = append(kvs.Kv, kv) + size += kv.Size() + } + if size >= pageSize { + kvChan <- kvs + kvs = new(pb.KVS) + size = 0 } } if len(kvs.Kv) > 0 { @@ -195,7 +190,7 @@ func (sl *Lists) produceKVs(ctx context.Context, ts uint64, } } -func (sl *Lists) streamKVs(ctx context.Context, prefix string, kvChan chan *pb.KVS) error { +func (sl *Lists) streamKVs(ctx context.Context, logPrefix string, kvChan chan *pb.KVS) error { var count int var bytesSent uint64 t := time.NewTicker(time.Second) @@ -224,7 +219,7 @@ func (sl *Lists) streamKVs(ctx context.Context, prefix string, kvChan chan *pb.K return err } glog.V(2).Infof("%s Created batch of size: %s in %s.\n", - prefix, humanize.Bytes(sz), time.Since(t)) + logPrefix, humanize.Bytes(sz), time.Since(t)) return nil } @@ -243,7 +238,7 @@ outer: } speed := bytesSent / durSec glog.Infof("%s Time elapsed: %s, bytes sent: %s, speed: %s/sec\n", - prefix, x.FixedDuration(dur), humanize.Bytes(bytesSent), humanize.Bytes(speed)) + logPrefix, x.FixedDuration(dur), humanize.Bytes(bytesSent), humanize.Bytes(speed)) case kvs, ok := <-kvChan: if !ok { @@ -257,6 +252,6 @@ outer: } } - glog.Infof("%s Sent %d keys\n", prefix, count) + glog.Infof("%s Sent %d keys\n", logPrefix, count) return nil } diff --git a/vendor/github.com/dgraph-io/badger/README.md b/vendor/github.com/dgraph-io/badger/README.md index 7f98c4e55a8..557037f00a7 100644 --- a/vendor/github.com/dgraph-io/badger/README.md +++ b/vendor/github.com/dgraph-io/badger/README.md @@ -1,4 +1,4 @@ -# BadgerDB [![GoDoc](https://godoc.org/github.com/dgraph-io/badger?status.svg)](https://godoc.org/github.com/dgraph-io/badger) [![Go Report Card](https://goreportcard.com/badge/github.com/dgraph-io/badger)](https://goreportcard.com/report/github.com/dgraph-io/badger) [![Build Status](https://teamcity.dgraph.io/guestAuth/app/rest/builds/buildType:(id:Badger_UnitTests)/statusIcon.svg)](https://teamcity.dgraph.io/viewLog.html?buildTypeId=Badger_UnitTests&buildId=lastFinished&guest=1) ![Appveyor](https://ci.appveyor.com/api/projects/status/github/dgraph-io/badger?branch=master&svg=true) [![Coverage Status](https://coveralls.io/repos/github/dgraph-io/badger/badge.svg?branch=master)](https://coveralls.io/github/dgraph-io/badger?branch=master) +# BadgerDB [![GoDoc](https://godoc.org/github.com/dgraph-io/badger?status.svg)](https://godoc.org/github.com/dgraph-io/badger) [![Go Report Card](https://goreportcard.com/badge/github.com/dgraph-io/badger)](https://goreportcard.com/report/github.com/dgraph-io/badger) [![Sourcegraph](https://sourcegraph.com/github.com/dgraph-io/badger/-/badge.svg)](https://sourcegraph.com/github.com/dgraph-io/badger?badge) [![Build Status](https://teamcity.dgraph.io/guestAuth/app/rest/builds/buildType:(id:Badger_UnitTests)/statusIcon.svg)](https://teamcity.dgraph.io/viewLog.html?buildTypeId=Badger_UnitTests&buildId=lastFinished&guest=1) ![Appveyor](https://ci.appveyor.com/api/projects/status/github/dgraph-io/badger?branch=master&svg=true) [![Coverage Status](https://coveralls.io/repos/github/dgraph-io/badger/badge.svg?branch=master)](https://coveralls.io/github/dgraph-io/badger?branch=master) ![Badger mascot](images/diggy-shadow.png) diff --git a/vendor/github.com/dgraph-io/badger/backup.go b/vendor/github.com/dgraph-io/badger/backup.go index df38b69c6f5..c8e7ac30bfb 100644 --- a/vendor/github.com/dgraph-io/badger/backup.go +++ b/vendor/github.com/dgraph-io/badger/backup.go @@ -18,6 +18,7 @@ package badger import ( "bufio" + "bytes" "encoding/binary" "io" "sync" @@ -47,6 +48,7 @@ func writeTo(entry *protos.KVPair, w io.Writer) error { // This can be used to backup the data in a database at a given point in time. func (db *DB) Backup(w io.Writer, since uint64) (uint64, error) { var tsNew uint64 + var skipKey []byte err := db.View(func(txn *Txn) error { opts := DefaultIteratorOptions opts.AllVersions = true @@ -55,28 +57,54 @@ func (db *DB) Backup(w io.Writer, since uint64) (uint64, error) { for it.Rewind(); it.Valid(); it.Next() { item := it.Item() - if item.Version() < since { - // Ignore versions less than given timestamp + if item.Version() < since || bytes.Equal(skipKey, item.Key()) { + // Ignore versions less than given timestamp, or skip older + // versions of the given skipKey. continue } - valCopy, err := item.ValueCopy(nil) - if err != nil { - Errorf("Key [%x]. Error while fetching value [%v]\n", item.Key(), err) - continue + skipKey = skipKey[:0] + + var valCopy []byte + if !item.IsDeletedOrExpired() { + // No need to copy value, if item is deleted or expired. + var err error + valCopy, err = item.ValueCopy(nil) + if err != nil { + Errorf("Key [%x, %d]. Error while fetching value [%v]\n", + item.Key(), item.Version(), err) + continue + } } + // clear txn bits + meta := item.meta &^ (bitTxn | bitFinTxn) + entry := &protos.KVPair{ Key: y.Copy(item.Key()), Value: valCopy, UserMeta: []byte{item.UserMeta()}, Version: item.Version(), ExpiresAt: item.ExpiresAt(), + Meta: []byte{meta}, } - - // Write entries to disk if err := writeTo(entry, w); err != nil { return err } + + switch { + case item.DiscardEarlierVersions(): + // If we need to discard earlier versions of this item, add a delete + // marker just below the current version. + entry.Version -= 1 + entry.Meta = []byte{bitDelete} + if err := writeTo(entry, w); err != nil { + return err + } + skipKey = item.KeyCopy(skipKey) + + case item.IsDeletedOrExpired(): + skipKey = item.KeyCopy(skipKey) + } } tsNew = txn.readTs return nil @@ -141,6 +169,7 @@ func (db *DB) Load(r io.Reader) error { Value: e.Value, UserMeta: e.UserMeta[0], ExpiresAt: e.ExpiresAt, + meta: e.Meta[0], }) // Update nextTxnTs, memtable stores this timestamp in badger head // when flushed. diff --git a/vendor/github.com/dgraph-io/badger/db.go b/vendor/github.com/dgraph-io/badger/db.go index b64fa5c6569..82846ef8472 100644 --- a/vendor/github.com/dgraph-io/badger/db.go +++ b/vendor/github.com/dgraph-io/badger/db.go @@ -23,6 +23,7 @@ import ( "math" "os" "path/filepath" + "sort" "strconv" "sync" "sync/atomic" @@ -1117,6 +1118,21 @@ func (db *DB) Tables() []TableInfo { return db.lc.getTableInfo() } +// KeySplits can be used to get rough key ranges to divide up iteration over +// the DB. +func (db *DB) KeySplits(prefix []byte) []string { + var splits []string + for _, ti := range db.Tables() { + // We don't use ti.Left, because that has a tendency to store !badger + // keys. + if bytes.HasPrefix(ti.Right, prefix) { + splits = append(splits, string(ti.Right)) + } + } + sort.Strings(splits) + return splits +} + // MaxBatchCount returns max possible entries in batch func (db *DB) MaxBatchCount() int64 { return db.opt.maxBatchCount diff --git a/vendor/github.com/dgraph-io/badger/iterator.go b/vendor/github.com/dgraph-io/badger/iterator.go index c8c7519dfc4..ace59102b28 100644 --- a/vendor/github.com/dgraph-io/badger/iterator.go +++ b/vendor/github.com/dgraph-io/badger/iterator.go @@ -318,12 +318,27 @@ type IteratorOptions struct { PrefetchValues bool // How many KV pairs to prefetch while iterating. Valid only if PrefetchValues is true. PrefetchSize int - Reverse bool // Direction of iteration. False is forward, true is backward. - AllVersions bool // Fetch all valid versions of the same key. + Prefix []byte // Only iterate over this given prefix. + Reverse bool // Direction of iteration. False is forward, true is backward. + AllVersions bool // Fetch all valid versions of the same key. internalAccess bool // Used to allow internal access to badger keys. } +func (opt *IteratorOptions) PickTable(left, right []byte) bool { + if len(opt.Prefix) == 0 { + return true + } + trim := func(key []byte) []byte { + if len(key) > len(opt.Prefix) { + return key[:len(opt.Prefix)] + } + return key + } + return bytes.Compare(trim(left), opt.Prefix) <= 0 && + bytes.Compare(trim(right), opt.Prefix) >= 0 +} + // DefaultIteratorOptions contains default options when iterating over Badger key-value stores. var DefaultIteratorOptions = IteratorOptions{ PrefetchValues: true, @@ -365,6 +380,8 @@ func (txn *Txn) NewIterator(opt IteratorOptions) *Iterator { panic("Only one iterator can be active at one time, for a RW txn.") } + // TODO: If Prefix is set, only pick those memtables which have keys with + // the prefix. tables, decr := txn.db.getMemTables() defer decr() txn.db.vlog.incrIteratorCount() @@ -375,7 +392,7 @@ func (txn *Txn) NewIterator(opt IteratorOptions) *Iterator { for i := 0; i < len(tables); i++ { iters = append(iters, tables[i].NewUniIterator(opt.Reverse)) } - iters = txn.db.lc.appendIterators(iters, opt.Reverse) // This will increment references. + iters = txn.db.lc.appendIterators(iters, &opt) // This will increment references. res := &Iterator{ txn: txn, iitr: y.NewMergeIterator(iters, opt.Reverse), @@ -402,12 +419,17 @@ func (it *Iterator) Item() *Item { } // Valid returns false when iteration is done. -func (it *Iterator) Valid() bool { return it.item != nil } +func (it *Iterator) Valid() bool { + if it.item == nil { + return false + } + return bytes.HasPrefix(it.item.key, it.opt.Prefix) +} // ValidForPrefix returns false when iteration is done // or when the current key is not prefixed by the specified prefix. func (it *Iterator) ValidForPrefix(prefix []byte) bool { - return it.item != nil && bytes.HasPrefix(it.item.key, prefix) + return it.Valid() && bytes.HasPrefix(it.item.key, prefix) } // Close would close the iterator. It is important to call this when you're done with iteration. @@ -602,6 +624,9 @@ func (it *Iterator) Seek(key []byte) { } it.lastKey = it.lastKey[:0] + if len(key) == 0 { + key = it.opt.Prefix + } if len(key) == 0 { it.iitr.Rewind() it.prefetch() @@ -621,14 +646,5 @@ func (it *Iterator) Seek(key []byte) { // smallest key if iterating forward, and largest if iterating backward. It does not keep track of // whether the cursor started with a Seek(). func (it *Iterator) Rewind() { - i := it.data.pop() - for i != nil { - i.wg.Wait() // Just cleaner to wait before pushing. No ref counting needed. - it.waste.push(i) - i = it.data.pop() - } - - it.lastKey = it.lastKey[:0] - it.iitr.Rewind() - it.prefetch() + it.Seek(nil) } diff --git a/vendor/github.com/dgraph-io/badger/level_handler.go b/vendor/github.com/dgraph-io/badger/level_handler.go index d7295c4acc3..75ee07b88bb 100644 --- a/vendor/github.com/dgraph-io/badger/level_handler.go +++ b/vendor/github.com/dgraph-io/badger/level_handler.go @@ -266,16 +266,26 @@ func (s *levelHandler) get(key []byte) (y.ValueStruct, error) { // appendIterators appends iterators to an array of iterators, for merging. // Note: This obtains references for the table handlers. Remember to close these iterators. -func (s *levelHandler) appendIterators(iters []y.Iterator, reversed bool) []y.Iterator { +func (s *levelHandler) appendIterators(iters []y.Iterator, opt *IteratorOptions) []y.Iterator { s.RLock() defer s.RUnlock() + tables := make([]*table.Table, 0, len(s.tables)) + for _, t := range s.tables { + if opt.PickTable(t.Smallest(), t.Biggest()) { + tables = append(tables, t) + } + } + if len(tables) == 0 { + return iters + } + if s.level == 0 { // Remember to add in reverse order! // The newer table at the end of s.tables should be added first as it takes precedence. - return appendIteratorsReversed(iters, s.tables, reversed) + return appendIteratorsReversed(iters, tables, opt.Reverse) } - return append(iters, table.NewConcatIterator(s.tables, reversed)) + return append(iters, table.NewConcatIterator(tables, opt.Reverse)) } type levelHandlerRLocked struct{} diff --git a/vendor/github.com/dgraph-io/badger/levels.go b/vendor/github.com/dgraph-io/badger/levels.go index e4f3759f54f..8412c9fe8f9 100644 --- a/vendor/github.com/dgraph-io/badger/levels.go +++ b/vendor/github.com/dgraph-io/badger/levels.go @@ -810,11 +810,11 @@ func appendIteratorsReversed(out []y.Iterator, th []*table.Table, reversed bool) // appendIterators appends iterators to an array of iterators, for merging. // Note: This obtains references for the table handlers. Remember to close these iterators. func (s *levelsController) appendIterators( - iters []y.Iterator, reversed bool) []y.Iterator { + iters []y.Iterator, opt *IteratorOptions) []y.Iterator { // Just like with get, it's important we iterate the levels from 0 on upward, to avoid missing // data when there's a compaction. for _, level := range s.levels { - iters = level.appendIterators(iters, reversed) + iters = level.appendIterators(iters, opt) } return iters } diff --git a/vendor/github.com/dgraph-io/badger/protos/backup.pb.go b/vendor/github.com/dgraph-io/badger/protos/backup.pb.go index 13a9f61999b..c16e990b334 100644 --- a/vendor/github.com/dgraph-io/badger/protos/backup.pb.go +++ b/vendor/github.com/dgraph-io/badger/protos/backup.pb.go @@ -1,18 +1,6 @@ // Code generated by protoc-gen-gogo. DO NOT EDIT. // source: backup.proto -/* - Package protos is a generated protocol buffer package. - - It is generated from these files: - backup.proto - manifest.proto - - It has these top-level messages: - KVPair - ManifestChangeSet - ManifestChange -*/ package protos import proto "github.com/golang/protobuf/proto" @@ -33,17 +21,49 @@ var _ = math.Inf const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package type KVPair struct { - Key []byte `protobuf:"bytes,1,opt,name=key,proto3" json:"key,omitempty"` - Value []byte `protobuf:"bytes,2,opt,name=value,proto3" json:"value,omitempty"` - UserMeta []byte `protobuf:"bytes,3,opt,name=userMeta,proto3" json:"userMeta,omitempty"` - Version uint64 `protobuf:"varint,4,opt,name=version,proto3" json:"version,omitempty"` - ExpiresAt uint64 `protobuf:"varint,5,opt,name=expires_at,json=expiresAt,proto3" json:"expires_at,omitempty"` + Key []byte `protobuf:"bytes,1,opt,name=key,proto3" json:"key,omitempty"` + Value []byte `protobuf:"bytes,2,opt,name=value,proto3" json:"value,omitempty"` + UserMeta []byte `protobuf:"bytes,3,opt,name=userMeta,proto3" json:"userMeta,omitempty"` + Version uint64 `protobuf:"varint,4,opt,name=version,proto3" json:"version,omitempty"` + ExpiresAt uint64 `protobuf:"varint,5,opt,name=expires_at,json=expiresAt,proto3" json:"expires_at,omitempty"` + Meta []byte `protobuf:"bytes,6,opt,name=meta,proto3" json:"meta,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *KVPair) Reset() { *m = KVPair{} } +func (m *KVPair) String() string { return proto.CompactTextString(m) } +func (*KVPair) ProtoMessage() {} +func (*KVPair) Descriptor() ([]byte, []int) { + return fileDescriptor_backup_7211d3e695800245, []int{0} +} +func (m *KVPair) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *KVPair) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_KVPair.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalTo(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (dst *KVPair) XXX_Merge(src proto.Message) { + xxx_messageInfo_KVPair.Merge(dst, src) +} +func (m *KVPair) XXX_Size() int { + return m.Size() +} +func (m *KVPair) XXX_DiscardUnknown() { + xxx_messageInfo_KVPair.DiscardUnknown(m) } -func (m *KVPair) Reset() { *m = KVPair{} } -func (m *KVPair) String() string { return proto.CompactTextString(m) } -func (*KVPair) ProtoMessage() {} -func (*KVPair) Descriptor() ([]byte, []int) { return fileDescriptorBackup, []int{0} } +var xxx_messageInfo_KVPair proto.InternalMessageInfo func (m *KVPair) GetKey() []byte { if m != nil { @@ -80,6 +100,13 @@ func (m *KVPair) GetExpiresAt() uint64 { return 0 } +func (m *KVPair) GetMeta() []byte { + if m != nil { + return m.Meta + } + return nil +} + func init() { proto.RegisterType((*KVPair)(nil), "protos.KVPair") } @@ -126,27 +153,18 @@ func (m *KVPair) MarshalTo(dAtA []byte) (int, error) { i++ i = encodeVarintBackup(dAtA, i, uint64(m.ExpiresAt)) } + if len(m.Meta) > 0 { + dAtA[i] = 0x32 + i++ + i = encodeVarintBackup(dAtA, i, uint64(len(m.Meta))) + i += copy(dAtA[i:], m.Meta) + } + if m.XXX_unrecognized != nil { + i += copy(dAtA[i:], m.XXX_unrecognized) + } return i, nil } -func encodeFixed64Backup(dAtA []byte, offset int, v uint64) int { - dAtA[offset] = uint8(v) - dAtA[offset+1] = uint8(v >> 8) - dAtA[offset+2] = uint8(v >> 16) - dAtA[offset+3] = uint8(v >> 24) - dAtA[offset+4] = uint8(v >> 32) - dAtA[offset+5] = uint8(v >> 40) - dAtA[offset+6] = uint8(v >> 48) - dAtA[offset+7] = uint8(v >> 56) - return offset + 8 -} -func encodeFixed32Backup(dAtA []byte, offset int, v uint32) int { - dAtA[offset] = uint8(v) - dAtA[offset+1] = uint8(v >> 8) - dAtA[offset+2] = uint8(v >> 16) - dAtA[offset+3] = uint8(v >> 24) - return offset + 4 -} func encodeVarintBackup(dAtA []byte, offset int, v uint64) int { for v >= 1<<7 { dAtA[offset] = uint8(v&0x7f | 0x80) @@ -157,6 +175,9 @@ func encodeVarintBackup(dAtA []byte, offset int, v uint64) int { return offset + 1 } func (m *KVPair) Size() (n int) { + if m == nil { + return 0 + } var l int _ = l l = len(m.Key) @@ -177,6 +198,13 @@ func (m *KVPair) Size() (n int) { if m.ExpiresAt != 0 { n += 1 + sovBackup(uint64(m.ExpiresAt)) } + l = len(m.Meta) + if l > 0 { + n += 1 + l + sovBackup(uint64(l)) + } + if m.XXX_unrecognized != nil { + n += len(m.XXX_unrecognized) + } return n } @@ -353,6 +381,37 @@ func (m *KVPair) Unmarshal(dAtA []byte) error { break } } + case 6: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Meta", wireType) + } + var byteLen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowBackup + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + byteLen |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + if byteLen < 0 { + return ErrInvalidLengthBackup + } + postIndex := iNdEx + byteLen + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Meta = append(m.Meta[:0], dAtA[iNdEx:postIndex]...) + if m.Meta == nil { + m.Meta = []byte{} + } + iNdEx = postIndex default: iNdEx = preIndex skippy, err := skipBackup(dAtA[iNdEx:]) @@ -365,6 +424,7 @@ func (m *KVPair) Unmarshal(dAtA []byte) error { if (iNdEx + skippy) > l { return io.ErrUnexpectedEOF } + m.XXX_unrecognized = append(m.XXX_unrecognized, dAtA[iNdEx:iNdEx+skippy]...) iNdEx += skippy } } @@ -479,19 +539,20 @@ var ( ErrIntOverflowBackup = fmt.Errorf("proto: integer overflow") ) -func init() { proto.RegisterFile("backup.proto", fileDescriptorBackup) } +func init() { proto.RegisterFile("backup.proto", fileDescriptor_backup_7211d3e695800245) } -var fileDescriptorBackup = []byte{ - // 167 bytes of a gzipped FileDescriptorProto +var fileDescriptor_backup_7211d3e695800245 = []byte{ + // 180 bytes of a gzipped FileDescriptorProto 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0xe2, 0x49, 0x4a, 0x4c, 0xce, - 0x2e, 0x2d, 0xd0, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0x62, 0x03, 0x53, 0xc5, 0x4a, 0xad, 0x8c, - 0x5c, 0x6c, 0xde, 0x61, 0x01, 0x89, 0x99, 0x45, 0x42, 0x02, 0x5c, 0xcc, 0xd9, 0xa9, 0x95, 0x12, - 0x8c, 0x0a, 0x8c, 0x1a, 0x3c, 0x41, 0x20, 0xa6, 0x90, 0x08, 0x17, 0x6b, 0x59, 0x62, 0x4e, 0x69, - 0xaa, 0x04, 0x13, 0x58, 0x0c, 0xc2, 0x11, 0x92, 0xe2, 0xe2, 0x28, 0x2d, 0x4e, 0x2d, 0xf2, 0x4d, - 0x2d, 0x49, 0x94, 0x60, 0x06, 0x4b, 0xc0, 0xf9, 0x42, 0x12, 0x5c, 0xec, 0x65, 0xa9, 0x45, 0xc5, - 0x99, 0xf9, 0x79, 0x12, 0x2c, 0x0a, 0x8c, 0x1a, 0x2c, 0x41, 0x30, 0xae, 0x90, 0x2c, 0x17, 0x57, - 0x6a, 0x45, 0x41, 0x66, 0x51, 0x6a, 0x71, 0x7c, 0x62, 0x89, 0x04, 0x2b, 0x58, 0x92, 0x13, 0x2a, - 0xe2, 0x58, 0xe2, 0x24, 0x70, 0xe2, 0x91, 0x1c, 0xe3, 0x85, 0x47, 0x72, 0x8c, 0x0f, 0x1e, 0xc9, - 0x31, 0xce, 0x78, 0x2c, 0xc7, 0x90, 0x04, 0x71, 0xa1, 0x31, 0x20, 0x00, 0x00, 0xff, 0xff, 0xe7, - 0x3f, 0x3f, 0x95, 0xb8, 0x00, 0x00, 0x00, + 0x2e, 0x2d, 0xd0, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0x62, 0x03, 0x53, 0xc5, 0x4a, 0x33, 0x19, + 0xb9, 0xd8, 0xbc, 0xc3, 0x02, 0x12, 0x33, 0x8b, 0x84, 0x04, 0xb8, 0x98, 0xb3, 0x53, 0x2b, 0x25, + 0x18, 0x15, 0x18, 0x35, 0x78, 0x82, 0x40, 0x4c, 0x21, 0x11, 0x2e, 0xd6, 0xb2, 0xc4, 0x9c, 0xd2, + 0x54, 0x09, 0x26, 0xb0, 0x18, 0x84, 0x23, 0x24, 0xc5, 0xc5, 0x51, 0x5a, 0x9c, 0x5a, 0xe4, 0x9b, + 0x5a, 0x92, 0x28, 0xc1, 0x0c, 0x96, 0x80, 0xf3, 0x85, 0x24, 0xb8, 0xd8, 0xcb, 0x52, 0x8b, 0x8a, + 0x33, 0xf3, 0xf3, 0x24, 0x58, 0x14, 0x18, 0x35, 0x58, 0x82, 0x60, 0x5c, 0x21, 0x59, 0x2e, 0xae, + 0xd4, 0x8a, 0x82, 0xcc, 0xa2, 0xd4, 0xe2, 0xf8, 0xc4, 0x12, 0x09, 0x56, 0xb0, 0x24, 0x27, 0x54, + 0xc4, 0xb1, 0x44, 0x48, 0x88, 0x8b, 0x25, 0x17, 0x64, 0x20, 0x1b, 0xd8, 0x40, 0x30, 0xdb, 0x49, + 0xe0, 0xc4, 0x23, 0x39, 0xc6, 0x0b, 0x8f, 0xe4, 0x18, 0x1f, 0x3c, 0x92, 0x63, 0x9c, 0xf1, 0x58, + 0x8e, 0x21, 0x09, 0xe2, 0x6a, 0x63, 0x40, 0x00, 0x00, 0x00, 0xff, 0xff, 0x7c, 0xe6, 0xe2, 0xc3, + 0xcc, 0x00, 0x00, 0x00, } diff --git a/vendor/github.com/dgraph-io/badger/protos/backup.proto b/vendor/github.com/dgraph-io/badger/protos/backup.proto index 0f4e3d61eee..06f483dde40 100644 --- a/vendor/github.com/dgraph-io/badger/protos/backup.proto +++ b/vendor/github.com/dgraph-io/badger/protos/backup.proto @@ -22,7 +22,8 @@ package protos; message KVPair { bytes key = 1; bytes value = 2; - bytes userMeta = 3; + bytes userMeta = 3; uint64 version = 4; uint64 expires_at = 5; -} \ No newline at end of file + bytes meta = 6; +} diff --git a/vendor/github.com/dgraph-io/badger/protos/manifest.pb.go b/vendor/github.com/dgraph-io/badger/protos/manifest.pb.go index d8db55f99de..98218437fc2 100644 --- a/vendor/github.com/dgraph-io/badger/protos/manifest.pb.go +++ b/vendor/github.com/dgraph-io/badger/protos/manifest.pb.go @@ -14,6 +14,12 @@ var _ = proto.Marshal var _ = fmt.Errorf var _ = math.Inf +// This is a compile-time assertion to ensure that this generated file +// is compatible with the proto package it is being compiled against. +// A compilation error at this line likely means your copy of the +// proto package needs to be updated. +const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package + type ManifestChange_Operation int32 const ( @@ -34,18 +40,49 @@ func (x ManifestChange_Operation) String() string { return proto.EnumName(ManifestChange_Operation_name, int32(x)) } func (ManifestChange_Operation) EnumDescriptor() ([]byte, []int) { - return fileDescriptorManifest, []int{1, 0} + return fileDescriptor_manifest_0dd769fc0ad4a032, []int{1, 0} } type ManifestChangeSet struct { // A set of changes that are applied atomically. - Changes []*ManifestChange `protobuf:"bytes,1,rep,name=changes" json:"changes,omitempty"` + Changes []*ManifestChange `protobuf:"bytes,1,rep,name=changes" json:"changes,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *ManifestChangeSet) Reset() { *m = ManifestChangeSet{} } +func (m *ManifestChangeSet) String() string { return proto.CompactTextString(m) } +func (*ManifestChangeSet) ProtoMessage() {} +func (*ManifestChangeSet) Descriptor() ([]byte, []int) { + return fileDescriptor_manifest_0dd769fc0ad4a032, []int{0} +} +func (m *ManifestChangeSet) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *ManifestChangeSet) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_ManifestChangeSet.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalTo(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (dst *ManifestChangeSet) XXX_Merge(src proto.Message) { + xxx_messageInfo_ManifestChangeSet.Merge(dst, src) +} +func (m *ManifestChangeSet) XXX_Size() int { + return m.Size() +} +func (m *ManifestChangeSet) XXX_DiscardUnknown() { + xxx_messageInfo_ManifestChangeSet.DiscardUnknown(m) } -func (m *ManifestChangeSet) Reset() { *m = ManifestChangeSet{} } -func (m *ManifestChangeSet) String() string { return proto.CompactTextString(m) } -func (*ManifestChangeSet) ProtoMessage() {} -func (*ManifestChangeSet) Descriptor() ([]byte, []int) { return fileDescriptorManifest, []int{0} } +var xxx_messageInfo_ManifestChangeSet proto.InternalMessageInfo func (m *ManifestChangeSet) GetChanges() []*ManifestChange { if m != nil { @@ -55,15 +92,46 @@ func (m *ManifestChangeSet) GetChanges() []*ManifestChange { } type ManifestChange struct { - Id uint64 `protobuf:"varint,1,opt,name=Id,proto3" json:"Id,omitempty"` - Op ManifestChange_Operation `protobuf:"varint,2,opt,name=Op,proto3,enum=protos.ManifestChange_Operation" json:"Op,omitempty"` - Level uint32 `protobuf:"varint,3,opt,name=Level,proto3" json:"Level,omitempty"` + Id uint64 `protobuf:"varint,1,opt,name=Id,proto3" json:"Id,omitempty"` + Op ManifestChange_Operation `protobuf:"varint,2,opt,name=Op,proto3,enum=protos.ManifestChange_Operation" json:"Op,omitempty"` + Level uint32 `protobuf:"varint,3,opt,name=Level,proto3" json:"Level,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *ManifestChange) Reset() { *m = ManifestChange{} } +func (m *ManifestChange) String() string { return proto.CompactTextString(m) } +func (*ManifestChange) ProtoMessage() {} +func (*ManifestChange) Descriptor() ([]byte, []int) { + return fileDescriptor_manifest_0dd769fc0ad4a032, []int{1} +} +func (m *ManifestChange) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *ManifestChange) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_ManifestChange.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalTo(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (dst *ManifestChange) XXX_Merge(src proto.Message) { + xxx_messageInfo_ManifestChange.Merge(dst, src) +} +func (m *ManifestChange) XXX_Size() int { + return m.Size() +} +func (m *ManifestChange) XXX_DiscardUnknown() { + xxx_messageInfo_ManifestChange.DiscardUnknown(m) } -func (m *ManifestChange) Reset() { *m = ManifestChange{} } -func (m *ManifestChange) String() string { return proto.CompactTextString(m) } -func (*ManifestChange) ProtoMessage() {} -func (*ManifestChange) Descriptor() ([]byte, []int) { return fileDescriptorManifest, []int{1} } +var xxx_messageInfo_ManifestChange proto.InternalMessageInfo func (m *ManifestChange) GetId() uint64 { if m != nil { @@ -118,6 +186,9 @@ func (m *ManifestChangeSet) MarshalTo(dAtA []byte) (int, error) { i += n } } + if m.XXX_unrecognized != nil { + i += copy(dAtA[i:], m.XXX_unrecognized) + } return i, nil } @@ -151,27 +222,12 @@ func (m *ManifestChange) MarshalTo(dAtA []byte) (int, error) { i++ i = encodeVarintManifest(dAtA, i, uint64(m.Level)) } + if m.XXX_unrecognized != nil { + i += copy(dAtA[i:], m.XXX_unrecognized) + } return i, nil } -func encodeFixed64Manifest(dAtA []byte, offset int, v uint64) int { - dAtA[offset] = uint8(v) - dAtA[offset+1] = uint8(v >> 8) - dAtA[offset+2] = uint8(v >> 16) - dAtA[offset+3] = uint8(v >> 24) - dAtA[offset+4] = uint8(v >> 32) - dAtA[offset+5] = uint8(v >> 40) - dAtA[offset+6] = uint8(v >> 48) - dAtA[offset+7] = uint8(v >> 56) - return offset + 8 -} -func encodeFixed32Manifest(dAtA []byte, offset int, v uint32) int { - dAtA[offset] = uint8(v) - dAtA[offset+1] = uint8(v >> 8) - dAtA[offset+2] = uint8(v >> 16) - dAtA[offset+3] = uint8(v >> 24) - return offset + 4 -} func encodeVarintManifest(dAtA []byte, offset int, v uint64) int { for v >= 1<<7 { dAtA[offset] = uint8(v&0x7f | 0x80) @@ -182,6 +238,9 @@ func encodeVarintManifest(dAtA []byte, offset int, v uint64) int { return offset + 1 } func (m *ManifestChangeSet) Size() (n int) { + if m == nil { + return 0 + } var l int _ = l if len(m.Changes) > 0 { @@ -190,10 +249,16 @@ func (m *ManifestChangeSet) Size() (n int) { n += 1 + l + sovManifest(uint64(l)) } } + if m.XXX_unrecognized != nil { + n += len(m.XXX_unrecognized) + } return n } func (m *ManifestChange) Size() (n int) { + if m == nil { + return 0 + } var l int _ = l if m.Id != 0 { @@ -205,6 +270,9 @@ func (m *ManifestChange) Size() (n int) { if m.Level != 0 { n += 1 + sovManifest(uint64(m.Level)) } + if m.XXX_unrecognized != nil { + n += len(m.XXX_unrecognized) + } return n } @@ -293,6 +361,7 @@ func (m *ManifestChangeSet) Unmarshal(dAtA []byte) error { if (iNdEx + skippy) > l { return io.ErrUnexpectedEOF } + m.XXX_unrecognized = append(m.XXX_unrecognized, dAtA[iNdEx:iNdEx+skippy]...) iNdEx += skippy } } @@ -400,6 +469,7 @@ func (m *ManifestChange) Unmarshal(dAtA []byte) error { if (iNdEx + skippy) > l { return io.ErrUnexpectedEOF } + m.XXX_unrecognized = append(m.XXX_unrecognized, dAtA[iNdEx:iNdEx+skippy]...) iNdEx += skippy } } @@ -514,9 +584,9 @@ var ( ErrIntOverflowManifest = fmt.Errorf("proto: integer overflow") ) -func init() { proto.RegisterFile("manifest.proto", fileDescriptorManifest) } +func init() { proto.RegisterFile("manifest.proto", fileDescriptor_manifest_0dd769fc0ad4a032) } -var fileDescriptorManifest = []byte{ +var fileDescriptor_manifest_0dd769fc0ad4a032 = []byte{ // 208 bytes of a gzipped FileDescriptorProto 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0xe2, 0xcb, 0x4d, 0xcc, 0xcb, 0x4c, 0x4b, 0x2d, 0x2e, 0xd1, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0x62, 0x03, 0x53, 0xc5, 0x4a, diff --git a/vendor/vendor.json b/vendor/vendor.json index 714115c2788..aec9167c131 100644 --- a/vendor/vendor.json +++ b/vendor/vendor.json @@ -421,50 +421,50 @@ "revisionTime": "2016-09-07T16:21:46Z" }, { - "checksumSHA1": "MukkNM0MBA1p8G3jlH15xpyQn4k=", + "checksumSHA1": "usnRpYL6s8vGf2zCn8vrSIFtTpE=", "path": "github.com/dgraph-io/badger", - "revision": "0648e0ae67ff803989d0dc30426eabb7a5ca4c49", - "revisionTime": "2018-10-27T15:48:13Z", + "revision": "4751ef1180486b0a270988c35606ddbb277eca89", + "revisionTime": "2018-11-25T21:11:43Z", "version": "HEAD", "versionExact": "HEAD" }, { "checksumSHA1": "oOuT7ebEiZ1ViHLKdFxKFOvobAQ=", "path": "github.com/dgraph-io/badger/options", - "revision": "0648e0ae67ff803989d0dc30426eabb7a5ca4c49", - "revisionTime": "2018-10-27T15:48:13Z", + "revision": "4751ef1180486b0a270988c35606ddbb277eca89", + "revisionTime": "2018-11-25T21:11:43Z", "version": "HEAD", "versionExact": "HEAD" }, { - "checksumSHA1": "gGTDnTVVw5kcT2P5NXZV1YSckOU=", + "checksumSHA1": "d8wE18ae6lOhmJqh0jwwhmQCkII=", "path": "github.com/dgraph-io/badger/protos", - "revision": "0648e0ae67ff803989d0dc30426eabb7a5ca4c49", - "revisionTime": "2018-10-27T15:48:13Z", + "revision": "4751ef1180486b0a270988c35606ddbb277eca89", + "revisionTime": "2018-11-25T21:11:43Z", "version": "HEAD", "versionExact": "HEAD" }, { "checksumSHA1": "00T6XbLV4d95J7hm6kTXDReaQHM=", "path": "github.com/dgraph-io/badger/skl", - "revision": "0648e0ae67ff803989d0dc30426eabb7a5ca4c49", - "revisionTime": "2018-10-27T15:48:13Z", + "revision": "4751ef1180486b0a270988c35606ddbb277eca89", + "revisionTime": "2018-11-25T21:11:43Z", "version": "HEAD", "versionExact": "HEAD" }, { "checksumSHA1": "I33KkP2lnYqJDasvvsAlebzkeko=", "path": "github.com/dgraph-io/badger/table", - "revision": "0648e0ae67ff803989d0dc30426eabb7a5ca4c49", - "revisionTime": "2018-10-27T15:48:13Z", + "revision": "4751ef1180486b0a270988c35606ddbb277eca89", + "revisionTime": "2018-11-25T21:11:43Z", "version": "HEAD", "versionExact": "HEAD" }, { "checksumSHA1": "4CGMDSqxpw7KoDkyOw70a7qalkw=", "path": "github.com/dgraph-io/badger/y", - "revision": "0648e0ae67ff803989d0dc30426eabb7a5ca4c49", - "revisionTime": "2018-10-27T15:48:13Z", + "revision": "4751ef1180486b0a270988c35606ddbb277eca89", + "revisionTime": "2018-11-25T21:11:43Z", "version": "HEAD", "versionExact": "HEAD" }, @@ -855,14 +855,14 @@ { "checksumSHA1": "UxahDzW2v4mf/+aFxruuupaoIwo=", "path": "golang.org/x/net/internal/timeseries", - "revision": "9b4f9f5ad5197c79fd623a3638e70d8b26cef344", - "revisionTime": "2018-10-23T11:20:58Z" + "revision": "adae6a3d119ae4890b46832a2e88a95adc62b8e7", + "revisionTime": "2018-11-14T21:44:15Z" }, { - "checksumSHA1": "6ckrK99wkirarIfFNX4+AHWBEHM=", + "checksumSHA1": "4vGl3N46SAJwQl/uSlQvZQvc734=", "path": "golang.org/x/net/trace", - "revision": "9b4f9f5ad5197c79fd623a3638e70d8b26cef344", - "revisionTime": "2018-10-23T11:20:58Z" + "revision": "adae6a3d119ae4890b46832a2e88a95adc62b8e7", + "revisionTime": "2018-11-14T21:44:15Z" }, { "checksumSHA1": "REkmyB368pIiip76LiqMLspgCRk=", diff --git a/worker/mutation.go b/worker/mutation.go index 30982d42f7c..0cd5fb1287b 100644 --- a/worker/mutation.go +++ b/worker/mutation.go @@ -246,17 +246,18 @@ func updateSchemaType(attr string, typ types.TypeID, index uint64) { } func hasEdges(attr string, startTs uint64) bool { + pk := x.ParsedKey{Attr: attr} iterOpt := badger.DefaultIteratorOptions iterOpt.PrefetchValues = false + iterOpt.Prefix = pk.DataPrefix() + txn := pstore.NewTransactionAt(startTs, false) defer txn.Discard() + it := txn.NewIterator(iterOpt) defer it.Close() - pk := x.ParsedKey{ - Attr: attr, - } - prefix := pk.DataPrefix() - for it.Seek(prefix); it.ValidForPrefix(prefix); it.Next() { + + for it.Rewind(); it.Valid(); it.Next() { // Check for non-empty posting // BitEmptyPosting is also a complete posting, // so checking for CompletePosting&BitCompletePosting > 0 would diff --git a/worker/sort.go b/worker/sort.go index 5b0e9693ba2..b55bc6d5a2e 100644 --- a/worker/sort.go +++ b/worker/sort.go @@ -152,14 +152,6 @@ func sortWithIndex(ctx context.Context, ts *pb.SortMessage) *sortresult { } order := ts.Order[0] - r := new(pb.SortResult) - // Iterate over every bucket / token. - iterOpt := badger.DefaultIteratorOptions - iterOpt.PrefetchValues = false - iterOpt.Reverse = order.Desc - txn := pstore.NewTransactionAt(ts.ReadTs, false) - defer txn.Discard() - typ, err := schema.State().TypeOf(order.Attr) if err != nil { return &sortresult{&emptySortResult, nil, fmt.Errorf("Attribute %s not defined in schema", order.Attr)} @@ -192,11 +184,17 @@ func sortWithIndex(ctx context.Context, ts *pb.SortMessage) *sortresult { return &sortresult{&emptySortResult, nil, x.Errorf("Attribute:%s is not sortable.", order.Attr)} } - indexPrefix := x.IndexKey(order.Attr, string(tokenizer.Identifier())) + // Iterate over every bucket / token. + iterOpt := badger.DefaultIteratorOptions + iterOpt.PrefetchValues = false + iterOpt.Reverse = order.Desc + iterOpt.Prefix = x.IndexKey(order.Attr, string(tokenizer.Identifier())) + txn := pstore.NewTransactionAt(ts.ReadTs, false) + defer txn.Discard() var seekKey []byte if !order.Desc { // We need to seek to the first key of this index type. - seekKey = indexPrefix + seekKey = nil // Would automatically seek to iterOpt.Prefix. } else { // We need to reach the last key of this index type. seekKey = x.IndexKey(order.Attr, string(tokenizer.Identifier()+1)) @@ -204,10 +202,10 @@ func sortWithIndex(ctx context.Context, ts *pb.SortMessage) *sortresult { itr := txn.NewIterator(iterOpt) defer itr.Close() + r := new(pb.SortResult) BUCKETS: - // Outermost loop is over index buckets. - for itr.Seek(seekKey); itr.ValidForPrefix(indexPrefix); itr.Next() { + for itr.Seek(seekKey); itr.Valid(); itr.Next() { item := itr.Item() key := item.Key() // No need to copy. select { diff --git a/worker/task.go b/worker/task.go index 3a464bf47ff..bda52180a2d 100644 --- a/worker/task.go +++ b/worker/task.go @@ -1677,20 +1677,19 @@ func (cp *countParams) evaluate(out *pb.Result) error { x.AssertTrue(count >= 1) countKey = x.CountKey(cp.attr, uint32(count), cp.reverse) + txn := pstore.NewTransactionAt(cp.readTs, false) + defer txn.Discard() + + pk := x.ParsedKey{Attr: cp.attr} itOpt := badger.DefaultIteratorOptions itOpt.PrefetchValues = false itOpt.Reverse = cp.fn == "le" || cp.fn == "lt" - txn := pstore.NewTransactionAt(cp.readTs, false) - defer txn.Discard() - pk := x.ParsedKey{ - Attr: cp.attr, - } - countPrefix := pk.CountPrefix(cp.reverse) + itOpt.Prefix = pk.CountPrefix(cp.reverse) itr := txn.NewIterator(itOpt) defer itr.Close() - for itr.Seek(countKey); itr.ValidForPrefix(countPrefix); itr.Next() { + for itr.Seek(countKey); itr.Valid(); itr.Next() { item := itr.Item() key := item.KeyCopy(nil) pl, err := posting.Get(key) @@ -1732,13 +1731,14 @@ func handleHasFunction(ctx context.Context, q *pb.Query, out *pb.Result) error { itOpt := badger.DefaultIteratorOptions itOpt.PrefetchValues = false itOpt.AllVersions = true + itOpt.Prefix = prefix it := txn.NewIterator(itOpt) defer it.Close() // This function could be switched to the stream.Lists framework, but after the change to use // BitCompletePosting, the speed here is already pretty fast. The slowdown for @lang predicates // occurs in filterStringFunction (like has(name) queries). - for it.Seek(startKey); it.ValidForPrefix(prefix); { + for it.Seek(startKey); it.Valid(); { item := it.Item() if bytes.Equal(item.Key(), prevKey) { it.Next() diff --git a/worker/tokens.go b/worker/tokens.go index e6ed2386bd6..d8c3be84c21 100644 --- a/worker/tokens.go +++ b/worker/tokens.go @@ -151,25 +151,26 @@ func getInequalityTokens(readTs uint64, attr, f string, return []string{ineqToken}, ineqToken, nil } - isgeOrGt := f == "ge" || f == "gt" - itOpt := badger.DefaultIteratorOptions - itOpt.PrefetchValues = false - itOpt.Reverse = !isgeOrGt - // TODO(txn): If some new index key was written as part of same transaction it won't be on disk - // until the txn is committed. Merge it with inmemory keys. + // If some new index key was written as part of same transaction it won't be on disk + // until the txn is committed. This is OK, we don't need to overlay in-memory contents on the + // DB, to keep the design simple and efficient. txn := pstore.NewTransactionAt(readTs, false) defer txn.Discard() - var out []string - indexPrefix := x.IndexKey(attr, string(tokenizer.Identifier())) seekKey := x.IndexKey(attr, ineqToken) + isgeOrGt := f == "ge" || f == "gt" + itOpt := badger.DefaultIteratorOptions + itOpt.PrefetchValues = false + itOpt.Reverse = !isgeOrGt + itOpt.Prefix = x.IndexKey(attr, string(tokenizer.Identifier())) itr := txn.NewIterator(itOpt) defer itr.Close() ineqTokenInBytes := []byte(ineqToken) //used for inequality comparison below - for itr.Seek(seekKey); itr.ValidForPrefix(indexPrefix); itr.Next() { + var out []string + for itr.Seek(seekKey); itr.Valid(); itr.Next() { item := itr.Item() key := item.Key() k := x.Parse(key) From 5dd9a3ae87a91058cfe65d4248a1ccb06ace63c0 Mon Sep 17 00:00:00 2001 From: Manish R Jain Date: Mon, 26 Nov 2018 13:13:27 -0800 Subject: [PATCH 2/2] Use NewKeyIterator during posting list creation. --- posting/mvcc.go | 3 +- .../github.com/dgraph-io/badger/iterator.go | 40 ++++++++++++++++--- .../dgraph-io/badger/level_handler.go | 2 +- .../dgraph-io/badger/table/table.go | 7 ++++ vendor/vendor.json | 28 ++++++------- 5 files changed, 57 insertions(+), 23 deletions(-) diff --git a/posting/mvcc.go b/posting/mvcc.go index 572aa4077aa..ff6a6dd3ba5 100644 --- a/posting/mvcc.go +++ b/posting/mvcc.go @@ -238,9 +238,8 @@ func getNew(key []byte, pstore *badger.DB) (*List, error) { l.minTs = item.Version() } else { iterOpts := badger.DefaultIteratorOptions - iterOpts.Prefix = key iterOpts.AllVersions = true - it := txn.NewIterator(iterOpts) + it := txn.NewKeyIterator(key, iterOpts) defer it.Close() it.Seek(key) l, err = ReadPostingList(key, it) diff --git a/vendor/github.com/dgraph-io/badger/iterator.go b/vendor/github.com/dgraph-io/badger/iterator.go index ace59102b28..d4d60ea0952 100644 --- a/vendor/github.com/dgraph-io/badger/iterator.go +++ b/vendor/github.com/dgraph-io/badger/iterator.go @@ -25,6 +25,7 @@ import ( "time" "github.com/dgraph-io/badger/options" + "github.com/dgraph-io/badger/table" "github.com/dgraph-io/badger/y" ) @@ -318,14 +319,19 @@ type IteratorOptions struct { PrefetchValues bool // How many KV pairs to prefetch while iterating. Valid only if PrefetchValues is true. PrefetchSize int - Prefix []byte // Only iterate over this given prefix. - Reverse bool // Direction of iteration. False is forward, true is backward. - AllVersions bool // Fetch all valid versions of the same key. + Reverse bool // Direction of iteration. False is forward, true is backward. + AllVersions bool // Fetch all valid versions of the same key. + + // The following option is used to narrow down the SSTables that iterator picks up. If + // Prefix is specified, only tables which could have this prefix are picked based on their range + // of keys. + Prefix []byte // Only iterate over this given prefix. + prefixIsKey bool // If set, use the prefix for bloom filter lookup. internalAccess bool // Used to allow internal access to badger keys. } -func (opt *IteratorOptions) PickTable(left, right []byte) bool { +func (opt *IteratorOptions) PickTable(t table.TableInterface) bool { if len(opt.Prefix) == 0 { return true } @@ -335,8 +341,18 @@ func (opt *IteratorOptions) PickTable(left, right []byte) bool { } return key } - return bytes.Compare(trim(left), opt.Prefix) <= 0 && - bytes.Compare(trim(right), opt.Prefix) >= 0 + if bytes.Compare(trim(t.Smallest()), opt.Prefix) > 0 { + return false + } + if bytes.Compare(trim(t.Biggest()), opt.Prefix) < 0 { + return false + } + // Bloom filter lookup would only work if opt.Prefix does NOT have the read + // timestamp as part of the key. + if opt.prefixIsKey && t.DoesNotHave(opt.Prefix) { + return false + } + return true } // DefaultIteratorOptions contains default options when iterating over Badger key-value stores. @@ -402,6 +418,18 @@ func (txn *Txn) NewIterator(opt IteratorOptions) *Iterator { return res } +// NewKeyIterator is just like NewIterator, but allows the user to iterate over all versions of a +// single key. Internally, it sets the Prefix option in provided opt, and uses that prefix to +// additionally run bloom filter lookups before picking tables from the LSM tree. +func (txn *Txn) NewKeyIterator(key []byte, opt IteratorOptions) *Iterator { + if len(opt.Prefix) > 0 { + panic("opt.Prefix should be nil for NewKeyIterator.") + } + opt.Prefix = key // This key must be without the timestamp. + opt.prefixIsKey = true + return txn.NewIterator(opt) +} + func (it *Iterator) newItem() *Item { item := it.waste.pop() if item == nil { diff --git a/vendor/github.com/dgraph-io/badger/level_handler.go b/vendor/github.com/dgraph-io/badger/level_handler.go index 75ee07b88bb..141e75ba6f3 100644 --- a/vendor/github.com/dgraph-io/badger/level_handler.go +++ b/vendor/github.com/dgraph-io/badger/level_handler.go @@ -272,7 +272,7 @@ func (s *levelHandler) appendIterators(iters []y.Iterator, opt *IteratorOptions) tables := make([]*table.Table, 0, len(s.tables)) for _, t := range s.tables { - if opt.PickTable(t.Smallest(), t.Biggest()) { + if opt.PickTable(t) { tables = append(tables, t) } } diff --git a/vendor/github.com/dgraph-io/badger/table/table.go b/vendor/github.com/dgraph-io/badger/table/table.go index 9804fa17644..7c95521f8d8 100644 --- a/vendor/github.com/dgraph-io/badger/table/table.go +++ b/vendor/github.com/dgraph-io/badger/table/table.go @@ -41,6 +41,13 @@ type keyOffset struct { len int } +// TableInterface is useful for testing. +type TableInterface interface { + Smallest() []byte + Biggest() []byte + DoesNotHave(key []byte) bool +} + // Table represents a loaded table file with the info we have about it type Table struct { sync.Mutex diff --git a/vendor/vendor.json b/vendor/vendor.json index aec9167c131..5da61d8820c 100644 --- a/vendor/vendor.json +++ b/vendor/vendor.json @@ -421,50 +421,50 @@ "revisionTime": "2016-09-07T16:21:46Z" }, { - "checksumSHA1": "usnRpYL6s8vGf2zCn8vrSIFtTpE=", + "checksumSHA1": "/TDEEKkPFm6UmQVBmbBROvM9iJk=", "path": "github.com/dgraph-io/badger", - "revision": "4751ef1180486b0a270988c35606ddbb277eca89", - "revisionTime": "2018-11-25T21:11:43Z", + "revision": "49a49e3217461bd89dba15c33edb90779bbfece2", + "revisionTime": "2018-11-26T21:07:12Z", "version": "HEAD", "versionExact": "HEAD" }, { "checksumSHA1": "oOuT7ebEiZ1ViHLKdFxKFOvobAQ=", "path": "github.com/dgraph-io/badger/options", - "revision": "4751ef1180486b0a270988c35606ddbb277eca89", - "revisionTime": "2018-11-25T21:11:43Z", + "revision": "49a49e3217461bd89dba15c33edb90779bbfece2", + "revisionTime": "2018-11-26T21:07:12Z", "version": "HEAD", "versionExact": "HEAD" }, { "checksumSHA1": "d8wE18ae6lOhmJqh0jwwhmQCkII=", "path": "github.com/dgraph-io/badger/protos", - "revision": "4751ef1180486b0a270988c35606ddbb277eca89", - "revisionTime": "2018-11-25T21:11:43Z", + "revision": "49a49e3217461bd89dba15c33edb90779bbfece2", + "revisionTime": "2018-11-26T21:07:12Z", "version": "HEAD", "versionExact": "HEAD" }, { "checksumSHA1": "00T6XbLV4d95J7hm6kTXDReaQHM=", "path": "github.com/dgraph-io/badger/skl", - "revision": "4751ef1180486b0a270988c35606ddbb277eca89", - "revisionTime": "2018-11-25T21:11:43Z", + "revision": "49a49e3217461bd89dba15c33edb90779bbfece2", + "revisionTime": "2018-11-26T21:07:12Z", "version": "HEAD", "versionExact": "HEAD" }, { - "checksumSHA1": "I33KkP2lnYqJDasvvsAlebzkeko=", + "checksumSHA1": "dCvRZ5BRgoLnfrL8rehjYt0Pmj4=", "path": "github.com/dgraph-io/badger/table", - "revision": "4751ef1180486b0a270988c35606ddbb277eca89", - "revisionTime": "2018-11-25T21:11:43Z", + "revision": "49a49e3217461bd89dba15c33edb90779bbfece2", + "revisionTime": "2018-11-26T21:07:12Z", "version": "HEAD", "versionExact": "HEAD" }, { "checksumSHA1": "4CGMDSqxpw7KoDkyOw70a7qalkw=", "path": "github.com/dgraph-io/badger/y", - "revision": "4751ef1180486b0a270988c35606ddbb277eca89", - "revisionTime": "2018-11-25T21:11:43Z", + "revision": "49a49e3217461bd89dba15c33edb90779bbfece2", + "revisionTime": "2018-11-26T21:07:12Z", "version": "HEAD", "versionExact": "HEAD" },