Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions posting/index_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,7 @@ func TestTokensTable(t *testing.T) {
attr := x.GalaxyAttr("name")
key := x.DataKey(attr, 1)
l, err := getNew(key, ps, math.MaxUint64)
l.mutationMap.readTs = 1
require.NoError(t, err)

edge := &pb.DirectedEdge{
Expand Down
22 changes: 17 additions & 5 deletions posting/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ func (mm *MutableLayer) setCurrentEntries(ts uint64, pl *pb.PostingList) {
return
}
if mm.readTs != 0 {
x.AssertTrue(mm.readTs == ts)
x.AssertTruef(mm.readTs == ts, "List object reused for a different transaction %d %d", mm.readTs, ts)
}

mm.readTs = ts
Expand Down Expand Up @@ -347,7 +347,7 @@ func (mm *MutableLayer) populateUidMap(pl *pb.PostingList) {
// insertPosting inserts a new posting in the mutable layers. It updates the currentUids map.
func (mm *MutableLayer) insertPosting(mpost *pb.Posting, hasCountIndex bool) {
if mm.readTs != 0 {
x.AssertTrue(mpost.StartTs == mm.readTs)
x.AssertTruef(mpost.StartTs == mm.readTs, "Diffrenent read ts and start ts found %d %d", mpost.StartTs, mm.readTs)
}

mm.readTs = mpost.StartTs
Expand Down Expand Up @@ -404,7 +404,7 @@ func (mm *MutableLayer) print() string {
mm.deleteAllMarker)
}

func (l *List) print() string {
func (l *List) Print() string {
return fmt.Sprintf("minTs: %d, plist: %+v, mutationMap: %s", l.minTs, l.plist, l.mutationMap.print())
}

Expand Down Expand Up @@ -693,6 +693,7 @@ func (it *pIterator) posting() *pb.Posting {
it.pidx++
}
it.uidPosting.Uid = uid
it.uidPosting.ValType = pb.Posting_UID
return it.uidPosting
}

Expand Down Expand Up @@ -993,6 +994,14 @@ func (l *List) setMutationAfterCommit(startTs, commitTs uint64, pl *pb.PostingLi
}
l.mutationMap.committedUidsTime = x.Max(l.mutationMap.committedUidsTime, commitTs)

if refresh {
newMap := make(map[uint64]*pb.Posting, len(l.mutationMap.committedUids))
for uid, post := range l.mutationMap.committedUids {
newMap[uid] = post
}
l.mutationMap.committedUids = newMap
}

for _, mpost := range pl.Postings {
if hasDeleteAll(mpost) {
l.mutationMap.deleteAllMarker = commitTs
Expand Down Expand Up @@ -1129,7 +1138,7 @@ func (l *List) iterate(readTs uint64, afterUid uint64, f func(obj *pb.Posting) e
// pitr iterates through immutable postings
err = pitr.seek(l, afterUid, deleteBelowTs)
if err != nil {
return errors.Wrapf(err, "cannot initialize iterator when calling List.iterate "+l.print())
return errors.Wrapf(err, "cannot initialize iterator when calling List.iterate %v", l.Print())
}

loop:
Expand Down Expand Up @@ -1919,7 +1928,10 @@ func (l *List) findStaticValue(readTs uint64) *pb.PostingList {

// If we reach here, that means that there was no entry in mutation map which is less than readTs. That
// means we need to return l.plist
return l.plist
if l.plist != nil && len(l.plist.Postings) > 0 {
return l.plist
}
return nil
}

// Value returns the default value from the posting list. The default value is
Expand Down
10 changes: 9 additions & 1 deletion posting/list_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ func TestGetSinglePosting(t *testing.T) {

res, err := l.StaticValue(1)
require.NoError(t, err)
fmt.Println(res, res == nil)
require.Equal(t, res == nil, true)

l.plist = create_pl(1, 1)
Expand Down Expand Up @@ -265,7 +266,7 @@ func TestAddMutation_jchiu1(t *testing.T) {
key := x.DataKey(x.GalaxyAttr(x.GalaxyAttr("value")), 12)
ol, err := GetNoStore(key, math.MaxUint64)
require.NoError(t, err)

ol.mutationMap.setTs(1)
// Set value to cars and merge to BadgerDB.
edge := &pb.DirectedEdge{
Value: []byte("cars"),
Expand Down Expand Up @@ -308,6 +309,7 @@ func TestAddMutation_DelSet(t *testing.T) {
key := x.DataKey(x.GalaxyAttr(x.GalaxyAttr("value")), 1534)
ol, err := GetNoStore(key, math.MaxUint64)
require.NoError(t, err)
ol.mutationMap.setTs(1)

// DO sp*, don't commit
// Del a value cars and but don't merge.
Expand All @@ -323,6 +325,7 @@ func TestAddMutation_DelSet(t *testing.T) {
Value: []byte("newcars"),
}
ol1, err := GetNoStore(key, math.MaxUint64)
ol1.mutationMap.setTs(2)
require.NoError(t, err)
txn = &Txn{StartTs: 2}
addMutationHelper(t, ol1, edge, Set, txn)
Expand All @@ -334,6 +337,7 @@ func TestAddMutation_DelSet(t *testing.T) {
func TestAddMutation_DelRead(t *testing.T) {
key := x.DataKey(x.GalaxyAttr(x.GalaxyAttr("value")), 1543)
ol, err := GetNoStore(key, math.MaxUint64)
ol.mutationMap.setTs(1)
require.NoError(t, err)

// Set value to newcars, and commit it
Expand Down Expand Up @@ -372,6 +376,7 @@ func TestAddMutation_DelRead(t *testing.T) {
func TestAddMutation_jchiu2(t *testing.T) {
key := x.DataKey(x.GalaxyAttr(x.GalaxyAttr("value")), 15)
ol, err := GetNoStore(key, math.MaxUint64)
ol.mutationMap.setTs(1)
require.NoError(t, err)

// Del a value cars and but don't merge.
Expand All @@ -394,6 +399,7 @@ func TestAddMutation_jchiu2(t *testing.T) {
func TestAddMutation_jchiu2_Commit(t *testing.T) {
key := x.DataKey(x.GalaxyAttr(x.GalaxyAttr("value")), 16)
ol, err := GetNoStore(key, math.MaxUint64)
ol.mutationMap.setTs(1)
require.NoError(t, err)

// Del a value cars and but don't merge.
Expand All @@ -419,6 +425,7 @@ func TestAddMutation_jchiu2_Commit(t *testing.T) {
func TestAddMutation_jchiu3(t *testing.T) {
key := x.DataKey(x.GalaxyAttr("value"), 29)
ol, err := GetNoStore(key, math.MaxUint64)
ol.mutationMap.setTs(1)
require.NoError(t, err)

// Set value to cars and merge to BadgerDB.
Expand Down Expand Up @@ -459,6 +466,7 @@ func TestAddMutation_jchiu3(t *testing.T) {
func TestAddMutation_mrjn1(t *testing.T) {
key := x.DataKey(x.GalaxyAttr("value"), 21)
ol, err := GetNoStore(key, math.MaxUint64)
ol.mutationMap.setTs(1)
require.NoError(t, err)

// Set a value cars and merge.
Expand Down
4 changes: 1 addition & 3 deletions posting/mvcc.go
Original file line number Diff line number Diff line change
Expand Up @@ -580,6 +580,7 @@ func ReadPostingList(key []byte, it *badger.Iterator) (*List, error) {
l := new(List)
l.key = key
l.plist = new(pb.PostingList)
l.mutationMap = newMutableLayer()
l.minTs = 0

// We use the following block of code to trigger incremental rollup on this key.
Expand Down Expand Up @@ -626,9 +627,6 @@ func ReadPostingList(key []byte, it *badger.Iterator) (*List, error) {
return err
}
pl.CommitTs = item.Version()
if l.mutationMap == nil {
l.mutationMap = newMutableLayer()
}
l.mutationMap.insertCommittedPostings(pl)
return nil
})
Expand Down
118 changes: 118 additions & 0 deletions worker/sort_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,124 @@ func TestEmptyTypeSchema(t *testing.T) {
x.ParseNamespaceAttr(types[0].TypeName)
}

func TestDeleteSetWithVarEdgeCorruptsData(t *testing.T) {
// Setup temporary directory for Badger DB
dir, err := os.MkdirTemp("", "storetest_")
require.NoError(t, err)
defer os.RemoveAll(dir)

opt := badger.DefaultOptions(dir)
ps, err := badger.OpenManaged(opt)
require.NoError(t, err)
posting.Init(ps, 0, false)
Init(ps)

// Set schema
schemaTxt := `
room: string @index(hash) @upsert .
person: string @index(hash) @upsert .
office: uid @reverse @count .
`
err = schema.ParseBytes([]byte(schemaTxt), 1)
require.NoError(t, err)

ctx := context.Background()
attrRoom := x.GalaxyAttr("room")
attrPerson := x.GalaxyAttr("person")
attrOffice := x.GalaxyAttr("office")

uidRoom := uint64(1)
uidJohn := uint64(2)

runMutation := func(startTs, commitTs uint64, edges []*pb.DirectedEdge) {
txn := posting.Oracle().RegisterStartTs(startTs)
for _, edge := range edges {
require.NoError(t, runMutation(ctx, edge, txn))
}
txn.Update()
writer := posting.NewTxnWriter(ps)
require.NoError(t, txn.CommitToDisk(writer, commitTs))
require.NoError(t, writer.Flush())
txn.UpdateCachedKeys(commitTs)
}

// Initial mutation: Set John → Leopard
runMutation(1, 3, []*pb.DirectedEdge{
{
Entity: uidJohn,
Attr: attrPerson,
Value: []byte("John Smith"),
ValueType: pb.Posting_STRING,
Op: pb.DirectedEdge_SET,
},
{
Entity: uidRoom,
Attr: attrRoom,
Value: []byte("Leopard"),
ValueType: pb.Posting_STRING,
Op: pb.DirectedEdge_SET,
},
{
Entity: uidJohn,
Attr: attrOffice,
ValueId: uidRoom,
ValueType: pb.Posting_UID,
Op: pb.DirectedEdge_SET,
},
})

key := x.DataKey(attrOffice, uidJohn)
rollup(t, key, ps, 4)

// Second mutation: Remove John from Leopard, assign Amanda
uidAmanda := uint64(3)

runMutation(6, 8, []*pb.DirectedEdge{
{
Entity: uidJohn,
Attr: attrOffice,
ValueId: uidRoom,
ValueType: pb.Posting_UID,
Op: pb.DirectedEdge_DEL,
},
{
Entity: uidAmanda,
Attr: attrPerson,
Value: []byte("Amanda Anderson"),
ValueType: pb.Posting_STRING,
Op: pb.DirectedEdge_SET,
},
{
Entity: uidAmanda,
Attr: attrOffice,
ValueId: uidRoom,
ValueType: pb.Posting_UID,
Op: pb.DirectedEdge_SET,
},
})

// Read and validate: Amanda assigned, John unassigned
txnRead := posting.Oracle().RegisterStartTs(10)

list, err := txnRead.Get(key)
require.NoError(t, err)

uids, err := list.Uids(posting.ListOptions{ReadTs: 10})
require.NoError(t, err)

// This assertion FAILS in the broken case where both Amanda and John are assigned
require.Equal(t, 0, len(uids.Uids), "John should no longer have an office assigned")

keyRev := x.ReverseKey(attrOffice, uidRoom)
listRev, err := txnRead.Get(keyRev)
require.NoError(t, err)

reverseUids, err := listRev.Uids(posting.ListOptions{ReadTs: 10})
require.NoError(t, err)

require.Equal(t, []uint64{uidAmanda}, reverseUids.Uids, "Only Amanda should be assigned on reverse edge")
}

func TestGetScalarList(t *testing.T) {
dir, err := os.MkdirTemp("", "storetest_")
x.Check(err)
Expand Down