From dc7383af7355e72d284bf361a9336bf13bfc5f7c Mon Sep 17 00:00:00 2001 From: Martin Martinez Rivera Date: Wed, 15 Jul 2020 13:13:48 -0700 Subject: [PATCH] fix(Dgraph): update reverse index when updating single UID predicates. (#5868) When updating a single uid predicate with a reverse index, the existing entry in the reverse index should be deleted first. Fixes DGRAPH-1738 (cherry picked from commit 5b70fe8aa2c4dc08f74794b3e9875c8789c11c9f) --- posting/index.go | 48 +++++++++++++++++---- systest/mutations_test.go | 88 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 128 insertions(+), 8 deletions(-) diff --git a/posting/index.go b/posting/index.go index ffea25d3d96..12b8d00984b 100644 --- a/posting/index.go +++ b/posting/index.go @@ -211,8 +211,37 @@ func (txn *Txn) addReverseAndCountMutation(ctx context.Context, t *pb.DirectedEd if err != nil { return err } + if plist == nil { + return errors.Errorf("nil posting list for reverse key %s", hex.Dump(key)) + } + + // For single uid predicates, updating the reverse index requires that the existing + // entries for this key in the index are removed. + pred, ok := schema.State().Get(ctx, t.Attr) + isSingleUidUpdate := ok && !pred.GetList() && pred.GetValueType() == pb.Posting_UID && + t.Op == pb.DirectedEdge_SET && t.ValueId != 0 + if isSingleUidUpdate { + dataKey := x.DataKey(t.Attr, t.Entity) + dataList, err := getFn(dataKey) + if err != nil { + return errors.Wrapf(err, "cannot find single uid list to update with key %s", + hex.Dump(dataKey)) + } + err = dataList.Iterate(txn.StartTs, 0, func(p *pb.Posting) error { + delEdge := &pb.DirectedEdge{ + Entity: t.Entity, + ValueId: p.Uid, + Attr: t.Attr, + Op: pb.DirectedEdge_DEL, + } + return txn.addReverseAndCountMutation(ctx, delEdge) + }) + if err != nil { + return errors.Wrapf(err, "cannot remove existing reverse index entries for key %s", + hex.Dump(dataKey)) + } + } - x.AssertTrue(plist != nil) // We must create a copy here. edge := &pb.DirectedEdge{ Entity: t.ValueId, @@ -233,6 +262,7 @@ func (txn *Txn) addReverseAndCountMutation(ctx context.Context, t *pb.DirectedEd return err } } + return nil } @@ -437,6 +467,15 @@ func (l *List) AddMutationWithIndex(ctx context.Context, edge *pb.DirectedEdge, doUpdateIndex := pstore != nil && schema.State().IsIndexed(ctx, edge.Attr) hasCountIndex := schema.State().HasCount(ctx, edge.Attr) + + // Add reverse mutation irrespective of hasMutated, server crash can happen after + // mutation is synced and before reverse edge is synced + if (pstore != nil) && (edge.ValueId != 0) && schema.State().IsReversed(ctx, edge.Attr) { + if err := txn.addReverseAndCountMutation(ctx, edge); err != nil { + return err + } + } + val, found, cp, err := txn.addMutationHelper(ctx, l, doUpdateIndex, hasCountIndex, edge) if err != nil { return err @@ -474,13 +513,6 @@ func (l *List) AddMutationWithIndex(ctx context.Context, edge *pb.DirectedEdge, } } } - // Add reverse mutation irrespective of hasMutated, server crash can happen after - // mutation is synced and before reverse edge is synced - if (pstore != nil) && (edge.ValueId != 0) && schema.State().IsReversed(ctx, edge.Attr) { - if err := txn.addReverseAndCountMutation(ctx, edge); err != nil { - return err - } - } return nil } diff --git a/systest/mutations_test.go b/systest/mutations_test.go index 514295f6a41..fd8908fae65 100644 --- a/systest/mutations_test.go +++ b/systest/mutations_test.go @@ -100,6 +100,7 @@ func TestSystem(t *testing.T) { t.Run("count index delete on non list predicate", wrap(CountIndexNonlistPredicateDelete)) t.Run("Reverse count index delete", wrap(ReverseCountIndexDelete)) t.Run("overwrite uid predicates", wrap(OverwriteUidPredicates)) + t.Run("overwrite uid predicates reverse index", wrap(OverwriteUidPredicatesReverse)) t.Run("delete and query same txn", wrap(DeleteAndQuerySameTxn)) } @@ -2253,6 +2254,93 @@ func OverwriteUidPredicates(t *testing.T, c *dgo.Dgraph) { string(resp.GetJson())) } +func OverwriteUidPredicatesReverse(t *testing.T, c *dgo.Dgraph) { + ctx := context.Background() + op := &api.Operation{DropAll: true} + require.NoError(t, c.Alter(ctx, op)) + + op = &api.Operation{ + Schema: ` + best_friend: uid @reverse . + name: string @index(exact) .`, + } + err := c.Alter(ctx, op) + require.NoError(t, err) + + txn := c.NewTxn() + _, err = txn.Mutate(context.Background(), &api.Mutation{ + CommitNow: true, + SetNquads: []byte(` + _:alice "Alice" . + _:bob "Bob" . + _:alice _:bob .`), + }) + require.NoError(t, err) + + q := `{ + me(func: eq(name, Alice)) { + name + best_friend { + name + } + } +}` + resp, err := c.NewReadOnlyTxn().Query(ctx, q) + require.NoError(t, err) + testutil.CompareJSON(t, `{"me":[{"name":"Alice","best_friend": {"name": "Bob"}}]}`, + string(resp.GetJson())) + + reverseQuery := `{ + reverse(func: has(~best_friend)) { + name + ~best_friend { + name + } + }}` + resp, err = c.NewReadOnlyTxn().Query(ctx, reverseQuery) + require.NoError(t, err) + testutil.CompareJSON(t, `{"reverse":[{"name":"Bob","~best_friend": [{"name": "Alice"}]}]}`, + string(resp.GetJson())) + + upsertQuery := `query { alice as var(func: eq(name, Alice)) }` + upsertMutation := &api.Mutation{ + SetNquads: []byte(` + _:carol "Carol" . + uid(alice) _:carol .`), + } + req := &api.Request{ + Query: upsertQuery, + Mutations: []*api.Mutation{upsertMutation}, + CommitNow: true, + } + _, err = c.NewTxn().Do(ctx, req) + require.NoError(t, err) + + resp, err = c.NewReadOnlyTxn().Query(ctx, reverseQuery) + require.NoError(t, err) + testutil.CompareJSON(t, `{"reverse":[{"name":"Carol","~best_friend": [{"name": "Alice"}]}]}`, + string(resp.GetJson())) + + // Delete the triples and verify the reverse edge is gone. + upsertMutation = &api.Mutation{ + DelNquads: []byte(` + uid(alice) * .`), + } + req = &api.Request{ + Query: upsertQuery, + Mutations: []*api.Mutation{upsertMutation}, + CommitNow: true, + } + _, err = c.NewTxn().Do(ctx, req) + require.NoError(t, err) + + resp, err = c.NewReadOnlyTxn().Query(ctx, reverseQuery) + require.NoError(t, err) + testutil.CompareJSON(t, `{"reverse":[]}`, + string(resp.GetJson())) + +} + func DeleteAndQuerySameTxn(t *testing.T, c *dgo.Dgraph) { // Set the schema. ctx := context.Background()