From 6492d4b18c1d6f2a35cade4fc5f457f3e02d419b Mon Sep 17 00:00:00 2001 From: Ahsan Barkati Date: Mon, 8 Mar 2021 16:45:20 +0530 Subject: [PATCH 1/2] Fix hung rollups --- posting/list.go | 20 +++++++++++++------- posting/list_test.go | 20 ++++++++++++++++++++ 2 files changed, 33 insertions(+), 7 deletions(-) diff --git a/posting/list.go b/posting/list.go index a2cda2ca28e..58bd6f448ee 100644 --- a/posting/list.go +++ b/posting/list.go @@ -1087,10 +1087,22 @@ func (ro *rollupOutput) getRange(uid uint64) (uint64, uint64) { return 1, math.MaxUint64 } +func shouldSplit(plist *pb.PostingList) (bool, error) { + r := roaring64.New() + if err := codec.FromPostingList(r, plist); err != nil { + return false, err + } + return plist.Size() >= maxListSize && r.GetCardinality() > 1, nil +} + func (ro *rollupOutput) runSplits() error { top: for startUid, pl := range ro.parts { - if pl.Size() >= maxListSize { + should, err := shouldSplit(pl) + if err != nil { + return err + } + if should { if err := ro.split(startUid); err != nil { return err } @@ -1103,7 +1115,6 @@ top: func (ro *rollupOutput) split(startUid uint64) error { pl := ro.parts[startUid] - // x.AssertTrue(pl.Size() >= maxListSize) r := roaring64.New() if err := codec.FromPostingList(r, pl); err != nil { @@ -1135,11 +1146,6 @@ func (ro *rollupOutput) split(startUid uint64) error { pl.Bitmap = codec.ToBytes(r) pl.Postings = pl.Postings[:idx] - // if startUid == uint64(126) { - // fmt.Printf("Start Split %d. First Part (%d -> %d, %d). Second (%d -> %d, %d)\n", - // startUid, r.Minimum(), r.Maximum(), len(newpl.Postings), - // nr.Minimum(), nr.Maximum(), len(pl.Postings)) - // } return nil } diff --git a/posting/list_test.go b/posting/list_test.go index 00624001db8..70c46ddbb68 100644 --- a/posting/list_test.go +++ b/posting/list_test.go @@ -977,6 +977,26 @@ func createAndDeleteMultiPartList(t *testing.T, size int) (*List, int) { return ol, commits } +func TestLargePlistSplit(t *testing.T) { + key := x.DataKey(uuid.New().String(), 1331) + ol, err := getNew(key, ps, math.MaxUint64) + require.NoError(t, err) + b := make([]byte, 30<<20) + rand.Read(b) + for i := 1; i <= 2; i++ { + edge := &pb.DirectedEdge{ + ValueId: uint64(i), + Facets: []*api.Facet{{Key: strconv.Itoa(i)}}, + Value: b, + } + txn := Txn{StartTs: uint64(i)} + addMutationHelper(t, ol, edge, Set, &txn) + require.NoError(t, ol.commitMutation(uint64(i), uint64(i)+1)) + } + _, err = ol.Rollup(nil) + require.NoError(t, err) +} + func TestDeleteStarMultiPartList(t *testing.T) { numEdges := 10000 From 0985cabd282ef73c24d033d69c802d0fa94bf9c5 Mon Sep 17 00:00:00 2001 From: Ahsan Barkati Date: Mon, 8 Mar 2021 21:23:22 +0530 Subject: [PATCH 2/2] Refactor --- posting/list.go | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/posting/list.go b/posting/list.go index 58bd6f448ee..4b1612a002e 100644 --- a/posting/list.go +++ b/posting/list.go @@ -1088,11 +1088,14 @@ func (ro *rollupOutput) getRange(uid uint64) (uint64, uint64) { } func shouldSplit(plist *pb.PostingList) (bool, error) { - r := roaring64.New() - if err := codec.FromPostingList(r, plist); err != nil { - return false, err + if plist.Size() >= maxListSize { + r := roaring64.New() + if err := codec.FromPostingList(r, plist); err != nil { + return false, err + } + return r.GetCardinality() > 1, nil } - return plist.Size() >= maxListSize && r.GetCardinality() > 1, nil + return false, nil } func (ro *rollupOutput) runSplits() error {