Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Set the Prefix option during iteration #2780

Merged
merged 2 commits into from
Nov 26, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions posting/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -394,13 +394,14 @@ func (l *List) AddMutationWithIndex(ctx context.Context, t *pb.DirectedEdge,
func deleteEntries(prefix []byte, remove func(key []byte) bool) error {
return pstore.View(func(txn *badger.Txn) error {
opt := badger.DefaultIteratorOptions
opt.Prefix = prefix
opt.PrefetchValues = false

itr := txn.NewIterator(opt)
defer itr.Close()

writer := x.NewTxnWriter(pstore)
for itr.Seek(prefix); itr.ValidForPrefix(prefix); itr.Next() {
for itr.Rewind(); itr.Valid(); itr.Next() {
item := itr.Item()
if !remove(item.Key()) {
continue
Expand Down Expand Up @@ -490,6 +491,7 @@ func (r *rebuild) Run(ctx context.Context) error {
r.startTs, hex.Dump(r.prefix))
opts := badger.DefaultIteratorOptions
opts.AllVersions = true
opts.Prefix = r.prefix
it := t.NewIterator(opts)
defer it.Close()

Expand All @@ -516,7 +518,7 @@ func (r *rebuild) Run(ctx context.Context) error {
}

var prevKey []byte
for it.Seek(r.prefix); it.ValidForPrefix(r.prefix); {
for it.Rewind(); it.Valid(); {
item := it.Item()
if bytes.Equal(item.Key(), prevKey) {
it.Next()
Expand Down
2 changes: 1 addition & 1 deletion posting/mvcc.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ func getNew(key []byte, pstore *badger.DB) (*List, error) {
} else {
iterOpts := badger.DefaultIteratorOptions
iterOpts.AllVersions = true
it := txn.NewIterator(iterOpts)
it := txn.NewKeyIterator(key, iterOpts)
defer it.Close()
it.Seek(key)
l, err = ReadPostingList(key, it)
Expand Down
22 changes: 12 additions & 10 deletions raftwal/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,12 +191,13 @@ func (w *DiskStorage) seekEntry(e *pb.Entry, seekTo uint64, reverse bool) (uint6
err := w.db.View(func(txn *badger.Txn) error {
opt := badger.DefaultIteratorOptions
opt.PrefetchValues = false
opt.Prefix = w.entryPrefix()
opt.Reverse = reverse
itr := txn.NewIterator(opt)
defer itr.Close()

itr.Seek(w.entryKey(seekTo))
if !itr.ValidForPrefix(w.entryPrefix()) {
if !itr.Valid() {
return errNotFound
}
item := itr.Item()
Expand Down Expand Up @@ -246,14 +247,14 @@ func (w *DiskStorage) deleteUntil(batch *badger.WriteBatch, until uint64) error
err := w.db.View(func(txn *badger.Txn) error {
opt := badger.DefaultIteratorOptions
opt.PrefetchValues = false
opt.Prefix = w.entryPrefix()
itr := txn.NewIterator(opt)
defer itr.Close()

start := w.entryKey(0)
prefix := w.entryPrefix()
first := true
var index uint64
for itr.Seek(start); itr.ValidForPrefix(prefix); itr.Next() {
for itr.Seek(start); itr.Valid(); itr.Next() {
item := itr.Item()
index = w.parseIndex(item.Key())
if first {
Expand Down Expand Up @@ -380,13 +381,13 @@ func (w *DiskStorage) deleteFrom(batch *badger.WriteBatch, from uint64) error {
var keys []string
err := w.db.View(func(txn *badger.Txn) error {
start := w.entryKey(from)
prefix := w.entryPrefix()
opt := badger.DefaultIteratorOptions
opt.PrefetchValues = false
opt.Prefix = w.entryPrefix()
itr := txn.NewIterator(opt)
defer itr.Close()

for itr.Seek(start); itr.ValidForPrefix(prefix); itr.Next() {
for itr.Seek(start); itr.Valid(); itr.Next() {
key := itr.Item().Key()
keys = append(keys, string(key))
}
Expand Down Expand Up @@ -437,12 +438,12 @@ func (w *DiskStorage) NumEntries() (int, error) {
err := w.db.View(func(txn *badger.Txn) error {
opt := badger.DefaultIteratorOptions
opt.PrefetchValues = false
opt.Prefix = w.entryPrefix()
itr := txn.NewIterator(opt)
defer itr.Close()

start := w.entryKey(0)
prefix := w.entryPrefix()
for itr.Seek(start); itr.ValidForPrefix(prefix); itr.Next() {
for itr.Seek(start); itr.Valid(); itr.Next() {
count++
}
return nil
Expand All @@ -467,16 +468,17 @@ func (w *DiskStorage) allEntries(lo, hi, maxSize uint64) (es []pb.Entry, rerr er
})
}

itr := txn.NewIterator(badger.DefaultIteratorOptions)
iopt := badger.DefaultIteratorOptions
iopt.Prefix = w.entryPrefix()
itr := txn.NewIterator(iopt)
defer itr.Close()

start := w.entryKey(lo)
end := w.entryKey(hi) // Not included in results.
prefix := w.entryPrefix()

var size, lastIndex uint64
first := true
for itr.Seek(start); itr.ValidForPrefix(prefix); itr.Next() {
for itr.Seek(start); itr.Valid(); itr.Next() {
item := itr.Item()
var e pb.Entry
if err := item.Value(func(val []byte) error {
Expand Down
65 changes: 30 additions & 35 deletions stream/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"time"

"github.com/dgraph-io/badger"
"github.com/dgraph-io/badger/y"
"github.com/dgraph-io/dgraph/protos/pb"
"github.com/dgraph-io/dgraph/x"

Expand Down Expand Up @@ -51,10 +52,12 @@ type keyRange struct {
end []byte
}

func (sl *Lists) Orchestrate(ctx context.Context, prefix string, ts uint64) error {
keyCh := make(chan keyRange, 100) // Contains keys for posting lists.
kvChan := make(chan *pb.KVS, 100) // Contains marshaled posting lists.
errCh := make(chan error, 1) // Stores error by consumeKeys.
func (sl *Lists) Orchestrate(ctx context.Context, logPrefix string, ts uint64) error {
keyCh := make(chan keyRange, 3) // Contains keys for posting lists.
// kvChan should only have a small capacity to ensure that we don't buffer up too much data, if
// sending is slow. So, setting this to 3.
kvChan := make(chan *pb.KVS, 3) // Contains marshaled posting lists.
errCh := make(chan error, 1) // Stores error by consumeKeys.

// Read the predicate keys and stream to keysCh.
go sl.produceRanges(ctx, ts, keyCh)
Expand All @@ -77,7 +80,7 @@ func (sl *Lists) Orchestrate(ctx context.Context, prefix string, ts uint64) erro
// Pick up key-values from kvChan and send to stream.
kvErr := make(chan error, 1)
go func() {
kvErr <- sl.streamKVs(ctx, prefix, kvChan)
kvErr <- sl.streamKVs(ctx, logPrefix, kvChan)
}()
wg.Wait() // Wait for produceKVs to be over.
close(kvChan) // Now we can close kvChan.
Expand All @@ -100,32 +103,15 @@ func (sl *Lists) produceRanges(ctx context.Context, ts uint64, keyCh chan keyRan
if len(sl.Predicate) > 0 {
prefix = x.PredicatePrefix(sl.Predicate)
}
txn := sl.DB.NewTransactionAt(ts, false)
defer txn.Discard()
iterOpts := badger.DefaultIteratorOptions
iterOpts.PrefetchValues = false
it := txn.NewIterator(iterOpts)
defer it.Close()

var start []byte
var size int64
for it.Seek(prefix); it.ValidForPrefix(prefix); it.Next() {
item := it.Item()
if len(start) == 0 {
start = item.KeyCopy(nil)
}

size += item.EstimatedSize()
if size > pageSize {
kr := keyRange{start: start, end: item.KeyCopy(nil)}
keyCh <- kr
start = item.KeyCopy(nil)
size = 0
}
}
if len(start) > 0 {
keyCh <- keyRange{start: start}
splits := sl.DB.KeySplits(prefix)
start := prefix
for _, key := range splits {
keyCh <- keyRange{start: start, end: y.SafeCopy(nil, []byte(key))}
start = y.SafeCopy(nil, []byte(key))
}
// Edge case: prefix is empty and no splits exist. In that case, we should have at least one
// keyRange output.
keyCh <- keyRange{start: start}
close(keyCh)
}

Expand All @@ -136,18 +122,21 @@ func (sl *Lists) produceKVs(ctx context.Context, ts uint64,
prefix = x.PredicatePrefix(sl.Predicate)
}

var size int
txn := sl.DB.NewTransactionAt(ts, false)
defer txn.Discard()
iterate := func(kr keyRange) error {
iterOpts := badger.DefaultIteratorOptions
iterOpts.AllVersions = true
iterOpts.Prefix = prefix
iterOpts.PrefetchValues = false
it := txn.NewIterator(iterOpts)
defer it.Close()

kvs := new(pb.KVS)
var prevKey []byte
for it.Seek(kr.start); it.ValidForPrefix(prefix); {
for it.Seek(kr.start); it.Valid(); {
// it.Valid would only return true for keys with the provided Prefix in iterOpts.
item := it.Item()
if bytes.Equal(item.Key(), prevKey) {
it.Next()
Expand All @@ -171,6 +160,12 @@ func (sl *Lists) produceKVs(ctx context.Context, ts uint64,
}
if kv != nil {
kvs.Kv = append(kvs.Kv, kv)
size += kv.Size()
}
if size >= pageSize {
kvChan <- kvs
kvs = new(pb.KVS)
size = 0
}
}
if len(kvs.Kv) > 0 {
Expand All @@ -195,7 +190,7 @@ func (sl *Lists) produceKVs(ctx context.Context, ts uint64,
}
}

func (sl *Lists) streamKVs(ctx context.Context, prefix string, kvChan chan *pb.KVS) error {
func (sl *Lists) streamKVs(ctx context.Context, logPrefix string, kvChan chan *pb.KVS) error {
var count int
var bytesSent uint64
t := time.NewTicker(time.Second)
Expand Down Expand Up @@ -224,7 +219,7 @@ func (sl *Lists) streamKVs(ctx context.Context, prefix string, kvChan chan *pb.K
return err
}
glog.V(2).Infof("%s Created batch of size: %s in %s.\n",
prefix, humanize.Bytes(sz), time.Since(t))
logPrefix, humanize.Bytes(sz), time.Since(t))
return nil
}

Expand All @@ -243,7 +238,7 @@ outer:
}
speed := bytesSent / durSec
glog.Infof("%s Time elapsed: %s, bytes sent: %s, speed: %s/sec\n",
prefix, x.FixedDuration(dur), humanize.Bytes(bytesSent), humanize.Bytes(speed))
logPrefix, x.FixedDuration(dur), humanize.Bytes(bytesSent), humanize.Bytes(speed))

case kvs, ok := <-kvChan:
if !ok {
Expand All @@ -257,6 +252,6 @@ outer:
}
}

glog.Infof("%s Sent %d keys\n", prefix, count)
glog.Infof("%s Sent %d keys\n", logPrefix, count)
return nil
}
2 changes: 1 addition & 1 deletion vendor/github.com/dgraph-io/badger/README.md

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

45 changes: 37 additions & 8 deletions vendor/github.com/dgraph-io/badger/backup.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

16 changes: 16 additions & 0 deletions vendor/github.com/dgraph-io/badger/db.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading