diff --git a/dgraph/cmd/alpha/reindex_test.go b/dgraph/cmd/alpha/reindex_test.go index 0628e5a97d1..1043e2ff3fa 100644 --- a/dgraph/cmd/alpha/reindex_test.go +++ b/dgraph/cmd/alpha/reindex_test.go @@ -117,3 +117,71 @@ func TestReindexLang(t *testing.T) { } }`, res) } + +func TestReindexReverseCount(t *testing.T) { + require.NoError(t, dropAll()) + require.NoError(t, alterSchema(`value: [uid] .`)) + + m1 := `{ + set { + <1> <4> . + <1> <5> . + <1> <6> . + <1> <7> . + <1> <8> . + <2> <4> . + <2> <5> . + <2> <6> . + <3> <5> . + <3> <6> . + } + }` + _, err := mutationWithTs(m1, "application/rdf", false, true, 0) + require.NoError(t, err) + + // reindex + require.NoError(t, alterSchema(`value: [uid] @count @reverse .`)) + + q1 := `{ + q(func: eq(count(~value), "3")) { + uid + } + }` + res, _, err := queryWithTs(q1, "application/graphql+-", "", 0) + require.NoError(t, err) + require.JSONEq(t, `{ + "data": { + "q": [ + { + "uid": "0x5" + }, + { + "uid": "0x6" + } + ] + } + }`, res) + + // adding another triplet + m2 := `{ set { <9> <4> . }}` + _, err = mutationWithTs(m2, "application/rdf", false, true, 0) + require.NoError(t, err) + + res, _, err = queryWithTs(q1, "application/graphql+-", "", 0) + require.NoError(t, err) + require.JSONEq(t, `{ + "data": { + "q": [ + { + "uid": "0x4" + }, + { + "uid": "0x5" + }, + { + "uid": "0x6" + } + ] + } + }`, res) +} diff --git a/posting/index.go b/posting/index.go index 084f4c08417..bd919a3e821 100644 --- a/posting/index.go +++ b/posting/index.go @@ -174,6 +174,30 @@ func (txn *Txn) addReverseMutationHelper(ctx context.Context, plist *List, } func (txn *Txn) addReverseMutation(ctx context.Context, t *pb.DirectedEdge) error { + key := x.ReverseKey(t.Attr, t.ValueId) + plist, err := txn.GetFromDelta(key) + if err != nil { + return err + } + x.AssertTrue(plist != nil) + + // We must create a copy here. + edge := &pb.DirectedEdge{ + Entity: t.ValueId, + ValueId: t.Entity, + Attr: t.Attr, + Op: t.Op, + Facets: t.Facets, + } + if err := plist.addMutation(ctx, txn, edge); err != nil { + return err + } + + ostats.Record(ctx, x.NumEdges.M(1)) + return nil +} + +func (txn *Txn) addReverseAndCountMutation(ctx context.Context, t *pb.DirectedEdge) error { key := x.ReverseKey(t.Attr, t.ValueId) hasCountIndex := schema.State().HasCount(t.Attr) @@ -190,8 +214,37 @@ func (txn *Txn) addReverseMutation(ctx context.Context, t *pb.DirectedEdge) erro if err != nil { return err } + if plist == nil { + return errors.Errorf("nil posting list for reverse key %s", hex.Dump(key)) + } + + // For single uid predicates, updating the reverse index requires that the existing + // entries for this key in the index are removed. + pred, ok := schema.State().Get(t.Attr) + isSingleUidUpdate := ok && !pred.GetList() && pred.GetValueType() == pb.Posting_UID && + t.Op == pb.DirectedEdge_SET && t.ValueId != 0 + if isSingleUidUpdate { + dataKey := x.DataKey(t.Attr, t.Entity) + dataList, err := getFn(dataKey) + if err != nil { + return errors.Wrapf(err, "cannot find single uid list to update with key %s", + hex.Dump(dataKey)) + } + err = dataList.Iterate(txn.StartTs, 0, func(p *pb.Posting) error { + delEdge := &pb.DirectedEdge{ + Entity: t.Entity, + ValueId: p.Uid, + Attr: t.Attr, + Op: pb.DirectedEdge_DEL, + } + return txn.addReverseAndCountMutation(ctx, delEdge) + }) + if err != nil { + return errors.Wrapf(err, "cannot remove existing reverse index entries for key %s", + hex.Dump(dataKey)) + } + } - x.AssertTrue(plist != nil) // We must create a copy here. edge := &pb.DirectedEdge{ Entity: t.ValueId, @@ -212,6 +265,7 @@ func (txn *Txn) addReverseMutation(ctx context.Context, t *pb.DirectedEdge) erro return err } } + return nil } @@ -233,7 +287,7 @@ func (l *List) handleDeleteAll(ctx context.Context, edge *pb.DirectedEdge, case isReversed: // Delete reverse edge for each posting. delEdge.ValueId = p.Uid - return txn.addReverseMutation(ctx, delEdge) + return txn.addReverseAndCountMutation(ctx, delEdge) case isIndexed: // Delete index edge of each posting. val := types.Val{ @@ -284,7 +338,6 @@ func (txn *Txn) addCountMutation(ctx context.Context, t *pb.DirectedEdge, count } ostats.Record(ctx, x.NumEdges.M(1)) return nil - } func (txn *Txn) updateCount(ctx context.Context, params countParams) error { @@ -293,9 +346,11 @@ func (txn *Txn) updateCount(ctx context.Context, params countParams) error { Attr: params.attr, Op: pb.DirectedEdge_DEL, } - if err := txn.addCountMutation(ctx, &edge, uint32(params.countBefore), - params.reverse); err != nil { - return err + if params.countBefore > 0 { + if err := txn.addCountMutation(ctx, &edge, uint32(params.countBefore), + params.reverse); err != nil { + return err + } } if params.countAfter > 0 { @@ -393,6 +448,15 @@ func (l *List) AddMutationWithIndex(ctx context.Context, edge *pb.DirectedEdge, doUpdateIndex := pstore != nil && schema.State().IsIndexed(edge.Attr) hasCountIndex := schema.State().HasCount(edge.Attr) + + // Add reverse mutation irrespective of hasMutated, server crash can happen after + // mutation is synced and before reverse edge is synced + if (pstore != nil) && (edge.ValueId != 0) && schema.State().IsReversed(edge.Attr) { + if err := txn.addReverseAndCountMutation(ctx, edge); err != nil { + return err + } + } + val, found, cp, err := txn.addMutationHelper(ctx, l, doUpdateIndex, hasCountIndex, edge) if err != nil { return err @@ -430,13 +494,6 @@ func (l *List) AddMutationWithIndex(ctx context.Context, edge *pb.DirectedEdge, } } } - // Add reverse mutation irrespective of hasMutated, server crash can happen after - // mutation is synced and before reverse edge is synced - if (pstore != nil) && (edge.ValueId != 0) && schema.State().IsReversed(edge.Attr) { - if err := txn.addReverseMutation(ctx, edge); err != nil { - return err - } - } return nil } @@ -531,13 +588,11 @@ func (r *rebuilder) Run(ctx context.Context) error { dbOpts := badger.DefaultOptions(tmpIndexDir). WithSyncWrites(false). WithNumVersionsToKeep(math.MaxInt64). + WithLogger(&x.ToGlog{}). WithCompression(options.None). WithLogRotatesToFlush(10). WithBlockCacheSize(50) // TODO(Aman): Disable cache altogether - // TODO(Ibrahim): Remove this once badger is updated. - dbOpts.ZSTDCompressionLevel = 1 - tmpDB, err := badger.OpenManaged(dbOpts) if err != nil { return errors.Wrap(err, "error opening temp badger for reindexing") @@ -555,9 +610,6 @@ func (r *rebuilder) Run(ctx context.Context) error { // Todo(Aman): Replace TxnWriter with WriteBatch. While we do that we should ensure that // WriteBatch has a mechanism for throttling. Also, find other places where TxnWriter // could be replaced with WriteBatch in the code - // Todo(Aman): Replace TxnWriter with WriteBatch. While we do that we should ensure that - // WriteBatch has a mechanism for throttling. Also, find other places where TxnWriter - // could be replaced with WriteBatch in the code. tmpWriter := NewTxnWriter(tmpDB) stream := pstore.NewStreamAt(r.startTs) stream.LogPrefix = fmt.Sprintf("Rebuilding index for predicate %s (1/2):", r.attr) @@ -580,6 +632,9 @@ func (r *rebuilder) Run(ctx context.Context) error { return nil, errors.Wrapf(err, "error reading posting list from disk") } + // We are using different transactions in each call to KeyToList function. This could + // be a problem for computing reverse count indexes if deltas for same key are added + // in different transactions. Such a case doesn't occur for now. txn := NewTxn(r.startTs) if err := r.fn(pk.Uid, l, txn); err != nil { return nil, err @@ -986,6 +1041,8 @@ func rebuildReverseEdges(ctx context.Context, rb *IndexRebuild) error { edge.Label = pp.Label for { + // we only need to build reverse index here. + // We will update the reverse count index separately. err := txn.addReverseMutation(ctx, &edge) switch err { case ErrRetry: