Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize has function #2724

Merged
merged 4 commits into from
Nov 5, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 11 additions & 3 deletions posting/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -601,10 +601,18 @@ func (l *List) iterate(readTs uint64, afterUid uint64, f func(obj *pb.Posting) e
return err
}

func (l *List) IsEmpty() bool {
func (l *List) IsEmpty(readTs, afterUid uint64) (bool, error) {
l.RLock()
defer l.RUnlock()
return codec.ApproxLen(l.plist.Pack) == 0 && len(l.mutationMap) == 0
var count int
err := l.iterate(readTs, afterUid, func(p *pb.Posting) error {
count++
return ErrStopIteration
})
if err != nil {
return false, err
}
return count == 0, nil
}

func (l *List) length(readTs, afterUid uint64) int {
Expand Down Expand Up @@ -655,7 +663,7 @@ func (l *List) MarshalToKv() (*pb.KV, error) {
}

func marshalPostingList(plist *pb.PostingList) (data []byte, meta byte) {
if plist.Pack == nil {
if plist.Pack == nil || len(plist.Pack.Blocks) == 0 {
return nil, BitEmptyPosting
}
data, err := plist.Marshal()
Expand Down
55 changes: 36 additions & 19 deletions worker/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"github.com/dgraph-io/dgraph/types"
"github.com/dgraph-io/dgraph/types/facets"
"github.com/dgraph-io/dgraph/x"
"github.com/golang/glog"

cindex "github.com/google/codesearch/index"
cregexp "github.com/google/codesearch/regexp"
Expand Down Expand Up @@ -561,12 +562,11 @@ func handleUidPostings(ctx context.Context, args funcArgs, opts posting.ListOpti
out.UidMatrix = append(out.UidMatrix, tlist)
}
case srcFn.fnType == HasFn:
len := pl.Length(args.q.ReadTs, 0)
if len == -1 {
return posting.ErrTsTooOld
empty, err := pl.IsEmpty(args.q.ReadTs, 0)
if err != nil {
return err
}
count := int64(len)
if EvalCompare("gt", count, 0) {
if !empty {
tlist := &pb.List{Uids: []uint64{q.UidList.Uids[i]}}
out.UidMatrix = append(out.UidMatrix, tlist)
}
Expand Down Expand Up @@ -1003,12 +1003,25 @@ func filterGeoFunction(arg funcArgs) error {
return nil
}

// TODO: This function is really slow when there are a lot of UIDs to filter, for e.g. when used in
// `has(name)`. We could potentially have a query level cache, which can be used to speed things up
// a bit. Or, try to reduce the number of UIDs which make it here.
func filterStringFunction(arg funcArgs) error {
if glog.V(3) {
glog.Infof("filterStringFunction. arg: %+v\n", arg.q)
defer glog.Infof("Done filterStringFunction")
}

attr := arg.q.Attr
uids := algo.MergeSorted(arg.out.UidMatrix)
var values [][]types.Val
filteredUids := make([]uint64, 0, len(uids.Uids))
lang := langForFunc(arg.q.Langs)

// This iteration must be done in a serial order, because we're also storing the values in a
// matrix, to check it later.
// TODO: This function can be optimized by having a query specific cache, which can be populated
// by the handleHasFunction for e.g. for a `has(name)` query.
for _, uid := range uids.Uids {
key := x.DataKey(attr, uid)
pl, err := posting.Get(key)
Expand Down Expand Up @@ -1615,11 +1628,11 @@ func (cp *countParams) evaluate(out *pb.Result) error {
return nil
}

// handleHasFunction looks at both the inmemory btree populated by
// posting/lists.go, and Badger. Thus, it can capture both committed
// transactions and in-progress transactions. It also accounts for transaction
// start ts.
func handleHasFunction(ctx context.Context, q *pb.Query, out *pb.Result) error {
if glog.V(3) {
glog.Infof("handleHasFunction query: %+v\n", q)
}

txn := pstore.NewTransactionAt(q.ReadTs, false)
defer txn.Discard()

Expand All @@ -1637,13 +1650,15 @@ func handleHasFunction(ctx context.Context, q *pb.Query, out *pb.Result) error {

result := &pb.List{}
var prevKey []byte
var w int
itOpt := badger.DefaultIteratorOptions
itOpt.PrefetchValues = false
itOpt.AllVersions = true
it := txn.NewIterator(itOpt)
defer it.Close()

// This function could be switched to the stream.Lists framework, but after the change to use
// BitCompletePosting, the speed here is already pretty fast. The slowdown for @lang predicates
// occurs in filterStringFunction (like has(name) queries).
for it.Seek(startKey); it.ValidForPrefix(prefix); {
item := it.Item()
if bytes.Equal(item.Key(), prevKey) {
Expand All @@ -1656,24 +1671,26 @@ func handleHasFunction(ctx context.Context, q *pb.Query, out *pb.Result) error {
// iterator.
pk := x.Parse(item.Key())

// The following optimization speeds up this iteration considerably, because it avoids
// the need to run ReadPostingList.
if item.UserMeta()&posting.BitCompletePosting > 0 {
// This bit would only be set if there are valid uids in UidPack.
result.Uids = append(result.Uids, pk.Uid)
continue
}

// We do need to copy over the key for ReadPostingList.
l, err := posting.ReadPostingList(item.KeyCopy(nil), it)
if err != nil {
return err
}
var num int
if err = l.Iterate(q.ReadTs, 0, func(_ *pb.Posting) error {
num++
return posting.ErrStopIteration
}); err != nil {
if empty, err := l.IsEmpty(q.ReadTs, 0); err != nil {
return err
}
if num > 0 {
} else if !empty {
result.Uids = append(result.Uids, pk.Uid)
w++
}

if w%1000 == 0 {
if len(result.Uids)%100000 == 0 {
select {
case <-ctx.Done():
return ctx.Err()
Expand Down