diff --git a/posting/index_test.go b/posting/index_test.go index 36f2d2278f5..2f62efdcef4 100644 --- a/posting/index_test.go +++ b/posting/index_test.go @@ -184,6 +184,7 @@ func TestTokensTable(t *testing.T) { attr := x.GalaxyAttr("name") key := x.DataKey(attr, 1) l, err := getNew(key, ps, math.MaxUint64) + l.mutationMap.readTs = 1 require.NoError(t, err) edge := &pb.DirectedEdge{ diff --git a/posting/list.go b/posting/list.go index 9fc39544083..c3e6a970568 100644 --- a/posting/list.go +++ b/posting/list.go @@ -154,7 +154,7 @@ func (mm *MutableLayer) setCurrentEntries(ts uint64, pl *pb.PostingList) { return } if mm.readTs != 0 { - x.AssertTrue(mm.readTs == ts) + x.AssertTruef(mm.readTs == ts, "List object reused for a different transaction %d %d", mm.readTs, ts) } mm.readTs = ts @@ -347,7 +347,7 @@ func (mm *MutableLayer) populateUidMap(pl *pb.PostingList) { // insertPosting inserts a new posting in the mutable layers. It updates the currentUids map. func (mm *MutableLayer) insertPosting(mpost *pb.Posting, hasCountIndex bool) { if mm.readTs != 0 { - x.AssertTrue(mpost.StartTs == mm.readTs) + x.AssertTruef(mpost.StartTs == mm.readTs, "Diffrenent read ts and start ts found %d %d", mpost.StartTs, mm.readTs) } mm.readTs = mpost.StartTs @@ -404,7 +404,7 @@ func (mm *MutableLayer) print() string { mm.deleteAllMarker) } -func (l *List) print() string { +func (l *List) Print() string { return fmt.Sprintf("minTs: %d, plist: %+v, mutationMap: %s", l.minTs, l.plist, l.mutationMap.print()) } @@ -693,6 +693,7 @@ func (it *pIterator) posting() *pb.Posting { it.pidx++ } it.uidPosting.Uid = uid + it.uidPosting.ValType = pb.Posting_UID return it.uidPosting } @@ -993,6 +994,14 @@ func (l *List) setMutationAfterCommit(startTs, commitTs uint64, pl *pb.PostingLi } l.mutationMap.committedUidsTime = x.Max(l.mutationMap.committedUidsTime, commitTs) + if refresh { + newMap := make(map[uint64]*pb.Posting, len(l.mutationMap.committedUids)) + for uid, post := range l.mutationMap.committedUids { + newMap[uid] = post + } + l.mutationMap.committedUids = newMap + } + for _, mpost := range pl.Postings { if hasDeleteAll(mpost) { l.mutationMap.deleteAllMarker = commitTs @@ -1129,7 +1138,7 @@ func (l *List) iterate(readTs uint64, afterUid uint64, f func(obj *pb.Posting) e // pitr iterates through immutable postings err = pitr.seek(l, afterUid, deleteBelowTs) if err != nil { - return errors.Wrapf(err, "cannot initialize iterator when calling List.iterate "+l.print()) + return errors.Wrapf(err, "cannot initialize iterator when calling List.iterate %v", l.Print()) } loop: @@ -1919,7 +1928,10 @@ func (l *List) findStaticValue(readTs uint64) *pb.PostingList { // If we reach here, that means that there was no entry in mutation map which is less than readTs. That // means we need to return l.plist - return l.plist + if l.plist != nil && len(l.plist.Postings) > 0 { + return l.plist + } + return nil } // Value returns the default value from the posting list. The default value is diff --git a/posting/list_test.go b/posting/list_test.go index f4bdf9c1e8a..b9c6e8c9119 100644 --- a/posting/list_test.go +++ b/posting/list_test.go @@ -135,6 +135,7 @@ func TestGetSinglePosting(t *testing.T) { res, err := l.StaticValue(1) require.NoError(t, err) + fmt.Println(res, res == nil) require.Equal(t, res == nil, true) l.plist = create_pl(1, 1) @@ -265,7 +266,7 @@ func TestAddMutation_jchiu1(t *testing.T) { key := x.DataKey(x.GalaxyAttr(x.GalaxyAttr("value")), 12) ol, err := GetNoStore(key, math.MaxUint64) require.NoError(t, err) - + ol.mutationMap.setTs(1) // Set value to cars and merge to BadgerDB. edge := &pb.DirectedEdge{ Value: []byte("cars"), @@ -308,6 +309,7 @@ func TestAddMutation_DelSet(t *testing.T) { key := x.DataKey(x.GalaxyAttr(x.GalaxyAttr("value")), 1534) ol, err := GetNoStore(key, math.MaxUint64) require.NoError(t, err) + ol.mutationMap.setTs(1) // DO sp*, don't commit // Del a value cars and but don't merge. @@ -323,6 +325,7 @@ func TestAddMutation_DelSet(t *testing.T) { Value: []byte("newcars"), } ol1, err := GetNoStore(key, math.MaxUint64) + ol1.mutationMap.setTs(2) require.NoError(t, err) txn = &Txn{StartTs: 2} addMutationHelper(t, ol1, edge, Set, txn) @@ -334,6 +337,7 @@ func TestAddMutation_DelSet(t *testing.T) { func TestAddMutation_DelRead(t *testing.T) { key := x.DataKey(x.GalaxyAttr(x.GalaxyAttr("value")), 1543) ol, err := GetNoStore(key, math.MaxUint64) + ol.mutationMap.setTs(1) require.NoError(t, err) // Set value to newcars, and commit it @@ -372,6 +376,7 @@ func TestAddMutation_DelRead(t *testing.T) { func TestAddMutation_jchiu2(t *testing.T) { key := x.DataKey(x.GalaxyAttr(x.GalaxyAttr("value")), 15) ol, err := GetNoStore(key, math.MaxUint64) + ol.mutationMap.setTs(1) require.NoError(t, err) // Del a value cars and but don't merge. @@ -394,6 +399,7 @@ func TestAddMutation_jchiu2(t *testing.T) { func TestAddMutation_jchiu2_Commit(t *testing.T) { key := x.DataKey(x.GalaxyAttr(x.GalaxyAttr("value")), 16) ol, err := GetNoStore(key, math.MaxUint64) + ol.mutationMap.setTs(1) require.NoError(t, err) // Del a value cars and but don't merge. @@ -419,6 +425,7 @@ func TestAddMutation_jchiu2_Commit(t *testing.T) { func TestAddMutation_jchiu3(t *testing.T) { key := x.DataKey(x.GalaxyAttr("value"), 29) ol, err := GetNoStore(key, math.MaxUint64) + ol.mutationMap.setTs(1) require.NoError(t, err) // Set value to cars and merge to BadgerDB. @@ -459,6 +466,7 @@ func TestAddMutation_jchiu3(t *testing.T) { func TestAddMutation_mrjn1(t *testing.T) { key := x.DataKey(x.GalaxyAttr("value"), 21) ol, err := GetNoStore(key, math.MaxUint64) + ol.mutationMap.setTs(1) require.NoError(t, err) // Set a value cars and merge. diff --git a/posting/mvcc.go b/posting/mvcc.go index ea734814674..ff232367113 100644 --- a/posting/mvcc.go +++ b/posting/mvcc.go @@ -580,6 +580,7 @@ func ReadPostingList(key []byte, it *badger.Iterator) (*List, error) { l := new(List) l.key = key l.plist = new(pb.PostingList) + l.mutationMap = newMutableLayer() l.minTs = 0 // We use the following block of code to trigger incremental rollup on this key. @@ -626,9 +627,6 @@ func ReadPostingList(key []byte, it *badger.Iterator) (*List, error) { return err } pl.CommitTs = item.Version() - if l.mutationMap == nil { - l.mutationMap = newMutableLayer() - } l.mutationMap.insertCommittedPostings(pl) return nil }) diff --git a/worker/sort_test.go b/worker/sort_test.go index 8f4be2684ed..68c3adb6415 100644 --- a/worker/sort_test.go +++ b/worker/sort_test.go @@ -101,6 +101,124 @@ func TestEmptyTypeSchema(t *testing.T) { x.ParseNamespaceAttr(types[0].TypeName) } +func TestDeleteSetWithVarEdgeCorruptsData(t *testing.T) { + // Setup temporary directory for Badger DB + dir, err := os.MkdirTemp("", "storetest_") + require.NoError(t, err) + defer os.RemoveAll(dir) + + opt := badger.DefaultOptions(dir) + ps, err := badger.OpenManaged(opt) + require.NoError(t, err) + posting.Init(ps, 0, false) + Init(ps) + + // Set schema + schemaTxt := ` + room: string @index(hash) @upsert . + person: string @index(hash) @upsert . + office: uid @reverse @count . + ` + err = schema.ParseBytes([]byte(schemaTxt), 1) + require.NoError(t, err) + + ctx := context.Background() + attrRoom := x.GalaxyAttr("room") + attrPerson := x.GalaxyAttr("person") + attrOffice := x.GalaxyAttr("office") + + uidRoom := uint64(1) + uidJohn := uint64(2) + + runMutation := func(startTs, commitTs uint64, edges []*pb.DirectedEdge) { + txn := posting.Oracle().RegisterStartTs(startTs) + for _, edge := range edges { + require.NoError(t, runMutation(ctx, edge, txn)) + } + txn.Update() + writer := posting.NewTxnWriter(ps) + require.NoError(t, txn.CommitToDisk(writer, commitTs)) + require.NoError(t, writer.Flush()) + txn.UpdateCachedKeys(commitTs) + } + + // Initial mutation: Set John → Leopard + runMutation(1, 3, []*pb.DirectedEdge{ + { + Entity: uidJohn, + Attr: attrPerson, + Value: []byte("John Smith"), + ValueType: pb.Posting_STRING, + Op: pb.DirectedEdge_SET, + }, + { + Entity: uidRoom, + Attr: attrRoom, + Value: []byte("Leopard"), + ValueType: pb.Posting_STRING, + Op: pb.DirectedEdge_SET, + }, + { + Entity: uidJohn, + Attr: attrOffice, + ValueId: uidRoom, + ValueType: pb.Posting_UID, + Op: pb.DirectedEdge_SET, + }, + }) + + key := x.DataKey(attrOffice, uidJohn) + rollup(t, key, ps, 4) + + // Second mutation: Remove John from Leopard, assign Amanda + uidAmanda := uint64(3) + + runMutation(6, 8, []*pb.DirectedEdge{ + { + Entity: uidJohn, + Attr: attrOffice, + ValueId: uidRoom, + ValueType: pb.Posting_UID, + Op: pb.DirectedEdge_DEL, + }, + { + Entity: uidAmanda, + Attr: attrPerson, + Value: []byte("Amanda Anderson"), + ValueType: pb.Posting_STRING, + Op: pb.DirectedEdge_SET, + }, + { + Entity: uidAmanda, + Attr: attrOffice, + ValueId: uidRoom, + ValueType: pb.Posting_UID, + Op: pb.DirectedEdge_SET, + }, + }) + + // Read and validate: Amanda assigned, John unassigned + txnRead := posting.Oracle().RegisterStartTs(10) + + list, err := txnRead.Get(key) + require.NoError(t, err) + + uids, err := list.Uids(posting.ListOptions{ReadTs: 10}) + require.NoError(t, err) + + // This assertion FAILS in the broken case where both Amanda and John are assigned + require.Equal(t, 0, len(uids.Uids), "John should no longer have an office assigned") + + keyRev := x.ReverseKey(attrOffice, uidRoom) + listRev, err := txnRead.Get(keyRev) + require.NoError(t, err) + + reverseUids, err := listRev.Uids(posting.ListOptions{ReadTs: 10}) + require.NoError(t, err) + + require.Equal(t, []uint64{uidAmanda}, reverseUids.Uids, "Only Amanda should be assigned on reverse edge") +} + func TestGetScalarList(t *testing.T) { dir, err := os.MkdirTemp("", "storetest_") x.Check(err)