diff --git a/posting/list.go b/posting/list.go index 10ca4fe80be..3c993e9f668 100644 --- a/posting/list.go +++ b/posting/list.go @@ -601,10 +601,18 @@ func (l *List) iterate(readTs uint64, afterUid uint64, f func(obj *pb.Posting) e return err } -func (l *List) IsEmpty() bool { +func (l *List) IsEmpty(readTs, afterUid uint64) (bool, error) { l.RLock() defer l.RUnlock() - return codec.ApproxLen(l.plist.Pack) == 0 && len(l.mutationMap) == 0 + var count int + err := l.iterate(readTs, afterUid, func(p *pb.Posting) error { + count++ + return ErrStopIteration + }) + if err != nil { + return false, err + } + return count == 0, nil } func (l *List) length(readTs, afterUid uint64) int { @@ -655,7 +663,7 @@ func (l *List) MarshalToKv() (*pb.KV, error) { } func marshalPostingList(plist *pb.PostingList) (data []byte, meta byte) { - if plist.Pack == nil { + if plist.Pack == nil || len(plist.Pack.Blocks) == 0 { return nil, BitEmptyPosting } data, err := plist.Marshal() diff --git a/worker/task.go b/worker/task.go index f48b882836b..83ac627b673 100644 --- a/worker/task.go +++ b/worker/task.go @@ -38,6 +38,7 @@ import ( "github.com/dgraph-io/dgraph/types" "github.com/dgraph-io/dgraph/types/facets" "github.com/dgraph-io/dgraph/x" + "github.com/golang/glog" cindex "github.com/google/codesearch/index" cregexp "github.com/google/codesearch/regexp" @@ -561,12 +562,11 @@ func handleUidPostings(ctx context.Context, args funcArgs, opts posting.ListOpti out.UidMatrix = append(out.UidMatrix, tlist) } case srcFn.fnType == HasFn: - len := pl.Length(args.q.ReadTs, 0) - if len == -1 { - return posting.ErrTsTooOld + empty, err := pl.IsEmpty(args.q.ReadTs, 0) + if err != nil { + return err } - count := int64(len) - if EvalCompare("gt", count, 0) { + if !empty { tlist := &pb.List{Uids: []uint64{q.UidList.Uids[i]}} out.UidMatrix = append(out.UidMatrix, tlist) } @@ -1003,12 +1003,25 @@ func filterGeoFunction(arg funcArgs) error { return nil } +// TODO: This function is really slow when there are a lot of UIDs to filter, for e.g. when used in +// `has(name)`. We could potentially have a query level cache, which can be used to speed things up +// a bit. Or, try to reduce the number of UIDs which make it here. func filterStringFunction(arg funcArgs) error { + if glog.V(3) { + glog.Infof("filterStringFunction. arg: %+v\n", arg.q) + defer glog.Infof("Done filterStringFunction") + } + attr := arg.q.Attr uids := algo.MergeSorted(arg.out.UidMatrix) var values [][]types.Val filteredUids := make([]uint64, 0, len(uids.Uids)) lang := langForFunc(arg.q.Langs) + + // This iteration must be done in a serial order, because we're also storing the values in a + // matrix, to check it later. + // TODO: This function can be optimized by having a query specific cache, which can be populated + // by the handleHasFunction for e.g. for a `has(name)` query. for _, uid := range uids.Uids { key := x.DataKey(attr, uid) pl, err := posting.Get(key) @@ -1615,11 +1628,11 @@ func (cp *countParams) evaluate(out *pb.Result) error { return nil } -// handleHasFunction looks at both the inmemory btree populated by -// posting/lists.go, and Badger. Thus, it can capture both committed -// transactions and in-progress transactions. It also accounts for transaction -// start ts. func handleHasFunction(ctx context.Context, q *pb.Query, out *pb.Result) error { + if glog.V(3) { + glog.Infof("handleHasFunction query: %+v\n", q) + } + txn := pstore.NewTransactionAt(q.ReadTs, false) defer txn.Discard() @@ -1637,13 +1650,15 @@ func handleHasFunction(ctx context.Context, q *pb.Query, out *pb.Result) error { result := &pb.List{} var prevKey []byte - var w int itOpt := badger.DefaultIteratorOptions itOpt.PrefetchValues = false itOpt.AllVersions = true it := txn.NewIterator(itOpt) defer it.Close() + // This function could be switched to the stream.Lists framework, but after the change to use + // BitCompletePosting, the speed here is already pretty fast. The slowdown for @lang predicates + // occurs in filterStringFunction (like has(name) queries). for it.Seek(startKey); it.ValidForPrefix(prefix); { item := it.Item() if bytes.Equal(item.Key(), prevKey) { @@ -1656,24 +1671,26 @@ func handleHasFunction(ctx context.Context, q *pb.Query, out *pb.Result) error { // iterator. pk := x.Parse(item.Key()) + // The following optimization speeds up this iteration considerably, because it avoids + // the need to run ReadPostingList. + if item.UserMeta()&posting.BitCompletePosting > 0 { + // This bit would only be set if there are valid uids in UidPack. + result.Uids = append(result.Uids, pk.Uid) + continue + } + // We do need to copy over the key for ReadPostingList. l, err := posting.ReadPostingList(item.KeyCopy(nil), it) if err != nil { return err } - var num int - if err = l.Iterate(q.ReadTs, 0, func(_ *pb.Posting) error { - num++ - return posting.ErrStopIteration - }); err != nil { + if empty, err := l.IsEmpty(q.ReadTs, 0); err != nil { return err - } - if num > 0 { + } else if !empty { result.Uids = append(result.Uids, pk.Uid) - w++ } - if w%1000 == 0 { + if len(result.Uids)%100000 == 0 { select { case <-ctx.Done(): return ctx.Err()