Skip to content

Commit

Permalink
posting list changes. (#6303) (#6334)
Browse files Browse the repository at this point in the history
(cherry picked from commit 926dbe1b9deab34e58fb0a1380aa862219d22e75)
(cherry picked from commit deded8b)
  • Loading branch information
martinmr authored Aug 31, 2020
1 parent a7ea3e1 commit e8d40d6
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 31 deletions.
71 changes: 40 additions & 31 deletions posting/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -882,25 +882,7 @@ type rollupOutput struct {
newMinTs uint64
}

// Merge all entries in mutation layer with commitTs <= l.commitTs into
// immutable layer. Note that readTs can be math.MaxUint64, so do NOT use it
// directly. It should only serve as the read timestamp for iteration.
func (l *List) rollup(readTs uint64, split bool) (*rollupOutput, error) {
l.AssertRLock()

// Pick all committed entries
if l.minTs > readTs {
// If we are already past the readTs, then skip the rollup.
return nil, nil
}

out := &rollupOutput{
plist: &pb.PostingList{
Splits: l.plist.Splits,
},
parts: make(map[uint64]*pb.PostingList),
}

func (l *List) encode(out *rollupOutput, readTs uint64, split bool) error {
var plist *pb.PostingList
var startUid, endUid uint64
var splitIdx int
Expand Down Expand Up @@ -947,19 +929,48 @@ func (l *List) rollup(readTs uint64, split bool) (*rollupOutput, error) {
})
// Finish writing the last part of the list (or the whole list if not a multi-part list).
if err != nil {
return nil, errors.Wrapf(err, "cannot iterate through the list")
return errors.Wrapf(err, "cannot iterate through the list")
}
plist.Pack = enc.Done()
if plist.Pack != nil {
if plist.Pack.BlockSize != uint32(blockSize) {
return nil, errors.Errorf("actual block size %d is different from expected value %d",
return errors.Errorf("actual block size %d is different from expected value %d",
plist.Pack.BlockSize, blockSize)
}
}

if len(l.plist.Splits) > 0 {
if split && len(l.plist.Splits) > 0 {
out.parts[startUid] = plist
}
return nil
}

// Merge all entries in mutation layer with commitTs <= l.commitTs into
// immutable layer. Note that readTs can be math.MaxUint64, so do NOT use it
// directly. It should only serve as the read timestamp for iteration.
func (l *List) rollup(readTs uint64, split bool) (*rollupOutput, error) {
l.AssertRLock()

// Pick all committed entries
if l.minTs > readTs {
// If we are already past the readTs, then skip the rollup.
return nil, nil
}

out := &rollupOutput{
plist: &pb.PostingList{
Splits: l.plist.Splits,
},
parts: make(map[uint64]*pb.PostingList),
}

if len(out.plist.Splits) > 0 || len(l.mutationMap) > 0 {
if err := l.encode(out, readTs, split); err != nil {
return nil, errors.Wrapf(err, "while encoding")
}
} else {
// We already have a nicely packed posting list. Just use it.
out.plist = l.plist
}

maxCommitTs := l.minTs
{
Expand Down Expand Up @@ -1367,7 +1378,7 @@ func shouldSplit(plist *pb.PostingList) bool {
}

func (out *rollupOutput) updateSplits() {
if out.plist == nil {
if out.plist == nil || len(out.parts) > 0 {
out.plist = &pb.PostingList{}
}
out.plist.Splits = out.splits()
Expand Down Expand Up @@ -1451,13 +1462,11 @@ func binSplit(lowUid uint64, plist *pb.PostingList) ([]uint64, []*pb.PostingList
}

// Add elements in plist.Postings to the corresponding list.
for _, posting := range plist.Postings {
if posting.Uid < midUid {
lowPl.Postings = append(lowPl.Postings, posting)
} else {
highPl.Postings = append(highPl.Postings, posting)
}
}
pidx := sort.Search(len(plist.Postings), func(idx int) bool {
return plist.Postings[idx].Uid >= midUid
})
lowPl.Postings = plist.Postings[:pidx]
highPl.Postings = plist.Postings[pidx:]

return []uint64{lowUid, midUid}, []*pb.PostingList{lowPl, highPl}
}
Expand Down
2 changes: 2 additions & 0 deletions posting/list_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -941,6 +941,8 @@ func createMultiPartList(t *testing.T, size int, addLabel bool) (*List, int) {
require.NoError(t, writePostingListToDisk(kvs))
ol, err = getNew(key, ps)
require.NoError(t, err)
require.Nil(t, ol.plist.Pack)
require.Equal(t, 0, len(ol.plist.Postings))
require.True(t, len(ol.plist.Splits) > 0)
verifySplits(t, ol.plist.Splits)

Expand Down

0 comments on commit e8d40d6

Please sign in to comment.