Skip to content

Commit

Permalink
fix(Dgraph): check for deleteBelowTs in pIterator.valid (#7288) (#7350)
Browse files Browse the repository at this point in the history
  • Loading branch information
ajeetdsouza authored Jan 22, 2021
1 parent a3eff77 commit 92e837f
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 29 deletions.
39 changes: 23 additions & 16 deletions posting/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ type pIterator struct {
deleteBelowTs uint64
}

func (it *pIterator) init(l *List, afterUid, deleteBelowTs uint64) error {
func (it *pIterator) seek(l *List, afterUid, deleteBelowTs uint64) error {
if deleteBelowTs > 0 && deleteBelowTs <= l.minTs {
return errors.Errorf("deleteBelowTs (%d) must be greater than the minTs in the list (%d)",
deleteBelowTs, l.minTs)
Expand Down Expand Up @@ -209,24 +209,19 @@ func (it *pIterator) moveToNextValidPart() error {
return nil
}

// If there are no more UIDs to iterate over, move to the next part of the
// list that contains valid data.
if len(it.uids) == 0 {
for it.splitIdx <= len(it.l.plist.Splits)-2 {
// moveToNextPart will increment it.splitIdx. Therefore, the for loop must only
// continue until len(splits) - 2.
if err := it.moveToNextPart(); err != nil {
return err
}

if len(it.uids) > 0 {
return nil
}
// Iterate while there are no UIDs, and while we have more splits to iterate over.
for len(it.uids) == 0 && it.splitIdx < len(it.l.plist.Splits)-1 {
// moveToNextPart will increment it.splitIdx. Therefore, the for loop must only
// continue until len(splits)-1.
if err := it.moveToNextPart(); err != nil {
return err
}
}

return nil
}

// next advances pIterator to the next valid part.
func (it *pIterator) next() error {
if it.deleteBelowTs > 0 {
it.uids = nil
Expand All @@ -244,7 +239,14 @@ func (it *pIterator) next() error {
hex.EncodeToString(it.l.key))
}

// valid asserts that pIterator has valid uids, or advances it to the next valid part.
// It returns false if there are no more valid parts.
func (it *pIterator) valid() (bool, error) {
if it.deleteBelowTs > 0 {
it.uids = nil
return false, nil
}

if len(it.uids) > 0 {
return true, nil
}
Expand Down Expand Up @@ -558,7 +560,8 @@ func (l *List) setMutation(startTs uint64, data []byte) {
l.Unlock()
}

// Iterate will allow you to iterate over this posting List, while having acquired a read lock.
// Iterate will allow you to iterate over the mutable and immutable layers of
// this posting List, while having acquired a read lock.
// So, please keep this iteration cheap, otherwise mutations would get stuck.
// The iteration will start after the provided UID. The results would not include this uid.
// The function will loop until either the posting List is fully iterated, or you return a false
Expand Down Expand Up @@ -641,6 +644,7 @@ func (l *List) pickPostings(readTs uint64) (uint64, []*pb.Posting) {
func (l *List) iterate(readTs uint64, afterUid uint64, f func(obj *pb.Posting) error) error {
l.AssertRLock()

// mposts is the list of mutable postings
deleteBelowTs, mposts := l.pickPostings(readTs)
if readTs < l.minTs {
return errors.Errorf("readTs: %d less than minTs: %d for key: %q", readTs, l.minTs, l.key)
Expand All @@ -660,7 +664,9 @@ func (l *List) iterate(readTs uint64, afterUid uint64, f func(obj *pb.Posting) e
prevUid uint64
err error
)
err = pitr.init(l, afterUid, deleteBelowTs)

// pitr iterates through immutable postings
err = pitr.seek(l, afterUid, deleteBelowTs)
if err != nil {
return errors.Wrapf(err, "cannot initialize iterator when calling List.iterate")
}
Expand Down Expand Up @@ -1427,6 +1433,7 @@ func (l *List) Facets(readTs uint64, param *pb.FacetParams, langs []string,
return fcs, nil
}

// readListPart reads one split of a posting list from Badger.
func (l *List) readListPart(startUid uint64) (*pb.PostingList, error) {
key, err := x.SplitKey(l.key, startUid)
if err != nil {
Expand Down
58 changes: 45 additions & 13 deletions posting/list_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ import (
"github.com/dgraph-io/dgraph/x"
)

func setMaxListSize(newMaxListSize int) {
maxListSize = newMaxListSize
}

func (l *List) PostingList() *pb.PostingList {
l.RLock()
defer l.RUnlock()
Expand Down Expand Up @@ -452,6 +456,7 @@ func TestAddMutation_mrjn1(t *testing.T) {

func TestMillion(t *testing.T) {
// Ensure list is stored in a single part.
defer setMaxListSize(maxListSize)
maxListSize = math.MaxInt32

key := x.DataKey("bal", 1331)
Expand Down Expand Up @@ -908,10 +913,8 @@ func verifySplits(t *testing.T, splits []uint64) {

func createMultiPartList(t *testing.T, size int, addLabel bool) (*List, int) {
// For testing, set the max list size to a lower threshold.
defer setMaxListSize(maxListSize)
maxListSize = 5000
defer func() {
maxListSize = math.MaxInt32
}()

key := x.DataKey(uuid.New().String(), 1331)
ol, err := getNew(key, ps, math.MaxUint64)
Expand Down Expand Up @@ -956,10 +959,8 @@ func createMultiPartList(t *testing.T, size int, addLabel bool) (*List, int) {

func createAndDeleteMultiPartList(t *testing.T, size int) (*List, int) {
// For testing, set the max list size to a lower threshold.
maxListSize = 5000
defer func() {
maxListSize = math.MaxInt32
}()
defer setMaxListSize(maxListSize)
maxListSize = 10000

key := x.DataKey(uuid.New().String(), 1331)
ol, err := getNew(key, ps, math.MaxUint64)
Expand Down Expand Up @@ -1008,6 +1009,41 @@ func createAndDeleteMultiPartList(t *testing.T, size int) (*List, int) {
return ol, commits
}

func TestDeleteStarMultiPartList(t *testing.T) {
numEdges := 10000

list, _ := createMultiPartList(t, numEdges, false)
parsedKey, err := x.Parse(list.key)
require.NoError(t, err)

validateCount := func(expected int) {
count := 0
list.Iterate(math.MaxUint64, 0, func(posting *pb.Posting) error {
count++
return nil
})
require.Equal(t, expected, count)
}
validateCount(numEdges)

readTs := list.maxTs + 1
commitTs := readTs + 1

txn := NewTxn(readTs)
edge := &pb.DirectedEdge{
ValueId: parsedKey.Uid,
Attr: parsedKey.Attr,
Value: []byte(x.Star),
Op: pb.DirectedEdge_DEL,
}
err = list.addMutation(context.Background(), txn, edge)
require.NoError(t, err)

err = list.commitMutation(readTs, commitTs)
require.NoError(t, err)
validateCount(0)
}

func writePostingListToDisk(kvs []*bpb.KV) error {
writer := NewTxnWriter(pstore)
for _, kv := range kvs {
Expand Down Expand Up @@ -1217,10 +1253,8 @@ func TestMultiPartListDelete(t *testing.T) {
func TestMultiPartListDeleteAndAdd(t *testing.T) {
size := int(1e5)
// For testing, set the max list size to a lower threshold.
defer setMaxListSize(maxListSize)
maxListSize = 5000
defer func() {
maxListSize = math.MaxInt32
}()

// Add entries to the maps.
key := x.DataKey(uuid.New().String(), 1331)
Expand Down Expand Up @@ -1355,10 +1389,8 @@ func TestSingleListRollup(t *testing.T) {

func TestRecursiveSplits(t *testing.T) {
// For testing, set the max list size to a lower threshold.
defer setMaxListSize(maxListSize)
maxListSize = mb / 2
defer func() {
maxListSize = math.MaxInt32
}()

// Create a list that should be split recursively.
size := int(1e5)
Expand Down

0 comments on commit 92e837f

Please sign in to comment.