From 4b6b97df822b5686c3b9782a27d44f147671411b Mon Sep 17 00:00:00 2001 From: Ahsan Barkati Date: Tue, 27 Jul 2021 19:27:04 +0530 Subject: [PATCH] fix(rollups): Write rolled-up keys at ts+1 (#7957) (#7959) Write rolled up keys at (max ts of the deltas + 1) because if we write the rolled-up keys at the same ts as that of the delta, then in case of WAL replay the rolled-up key would get over-written by the delta which can bring DB to an invalid state. (cherry picked from commit 3831b490c88e3b4467dee95cfcd281e5285a05c9) --- posting/list.go | 15 +++++++++++-- posting/list_test.go | 53 ++++++++++++++++++++++++++------------------ posting/mvcc_test.go | 2 +- 3 files changed, 45 insertions(+), 25 deletions(-) diff --git a/posting/list.go b/posting/list.go index 236b11d94d0..767609860ba 100644 --- a/posting/list.go +++ b/posting/list.go @@ -822,12 +822,23 @@ func (l *List) Rollup() ([]*bpb.KV, error) { } var kvs []*bpb.KV + + // We set kv.Version to newMinTs + 1 because if we write the rolled up keys at the same ts as + // that of the delta, then in case of wal replay the rolled up key would get over-written by the + // delta which can bring db to an invalid state. + // It would be fine to write rolled up key at ts+1 and this key won't be overwritten by any + // other delta because there cannot be commit at ts as well as ts+1 on the same key. The reason + // is as follows: + // Suppose there are two inter-leaved txns [s1 s2 c1 c2] where si, ci is the start and commit + // of the i'th txn. In this case c2 would not have happened because of conflict. + // Suppose there are two disjoint txns [s1 c1 s2 c2], then c1 and c2 cannot be consecutive. kv := &bpb.KV{} - kv.Version = out.newMinTs + kv.Version = out.newMinTs + 1 kv.Key = l.key val, meta := marshalPostingList(out.plist) kv.UserMeta = []byte{meta} kv.Value = val + kvs = append(kvs, kv) for startUid, plist := range out.parts { @@ -879,7 +890,7 @@ func (l *List) SingleListRollup(kv *bpb.KV) error { func (out *rollupOutput) marshalPostingListPart( baseKey []byte, startUid uint64, plist *pb.PostingList) (*bpb.KV, error) { kv := &bpb.KV{} - kv.Version = out.newMinTs + kv.Version = out.newMinTs + 1 key, err := x.SplitKey(baseKey, startUid) if err != nil { return nil, errors.Wrapf(err, diff --git a/posting/list_test.go b/posting/list_test.go index 295e06b9bf8..9ce44cb6787 100644 --- a/posting/list_test.go +++ b/posting/list_test.go @@ -479,7 +479,7 @@ func TestMillion(t *testing.T) { } t.Logf("Completed a million writes.\n") - opt := ListOptions{ReadTs: uint64(N) + 1} + opt := ListOptions{ReadTs: math.MaxUint64} l, err := ol.Uids(opt) require.NoError(t, err) require.Equal(t, commits, len(l.Uids), "List of Uids received: %+v", l.Uids) @@ -916,6 +916,7 @@ func createMultiPartList(t *testing.T, size int, addLabel bool) (*List, int) { ol, err := getNew(key, ps, math.MaxUint64) require.NoError(t, err) commits := 0 + curTs := 1 for i := 1; i <= size; i++ { edge := &pb.DirectedEdge{ ValueId: uint64(i), @@ -924,10 +925,11 @@ func createMultiPartList(t *testing.T, size int, addLabel bool) (*List, int) { edge.Label = strconv.Itoa(i) } - txn := Txn{StartTs: uint64(i)} + txn := Txn{StartTs: uint64(curTs)} addMutationHelper(t, ol, edge, Set, &txn) - require.NoError(t, ol.commitMutation(uint64(i), uint64(i)+1)) + require.NoError(t, ol.commitMutation(uint64(curTs), uint64(curTs)+1)) if i%2000 == 0 { + curTs++ kvs, err := ol.Rollup() require.NoError(t, err) require.NoError(t, writePostingListToDisk(kvs)) @@ -935,11 +937,12 @@ func createMultiPartList(t *testing.T, size int, addLabel bool) (*List, int) { require.NoError(t, err) } commits++ + curTs++ } kvs, err := ol.Rollup() for _, kv := range kvs { - require.Equal(t, uint64(size+1), kv.Version) + require.Equal(t, uint64(curTs+1), kv.Version) } require.NoError(t, err) require.NoError(t, writePostingListToDisk(kvs)) @@ -1021,7 +1024,7 @@ func writePostingListToDisk(kvs []*bpb.KV) error { func TestMultiPartListBasic(t *testing.T) { size := int(1e5) ol, commits := createMultiPartList(t, size, false) - opt := ListOptions{ReadTs: uint64(size) + 1} + opt := ListOptions{ReadTs: math.MaxUint64} l, err := ol.Uids(opt) require.NoError(t, err) require.Equal(t, commits, len(l.Uids), "List of Uids received: %+v", l.Uids) @@ -1036,7 +1039,7 @@ func TestMultiPartListIterAfterUid(t *testing.T) { ol, _ := createMultiPartList(t, size, false) var visitedUids []uint64 - ol.Iterate(uint64(size+1), 50000, func(p *pb.Posting) error { + ol.Iterate(math.MaxUint64, 50000, func(p *pb.Posting) error { visitedUids = append(visitedUids, p.Uid) return nil }) @@ -1052,7 +1055,7 @@ func TestMultiPartListWithPostings(t *testing.T) { ol, commits := createMultiPartList(t, size, true) var labels []string - err := ol.Iterate(uint64(size)+1, 0, func(p *pb.Posting) error { + err := ol.Iterate(math.MaxUint64, 0, func(p *pb.Posting) error { if len(p.Label) > 0 { labels = append(labels, p.Label) } @@ -1089,7 +1092,7 @@ func TestMultiPartListMarshal(t *testing.T) { require.NoError(t, err) require.Equal(t, data, kvs[i+1].Value) require.Equal(t, []byte{BitCompletePosting}, kvs[i+1].UserMeta) - require.Equal(t, ol.minTs, kvs[i+1].Version) + require.Equal(t, ol.minTs+1, kvs[i+1].Version) } } @@ -1106,7 +1109,7 @@ func TestMultiPartListWriteToDisk(t *testing.T) { newList, err := getNew(kvs[0].Key, ps, math.MaxUint64) require.NoError(t, err) - opt := ListOptions{ReadTs: uint64(size) + 1} + opt := ListOptions{ReadTs: math.MaxUint64} originalUids, err := originalList.Uids(opt) require.NoError(t, err) newUids, err := newList.Uids(opt) @@ -1137,7 +1140,7 @@ func TestMultiPartListDelete(t *testing.T) { for _, kv := range kvs { require.Equal(t, []byte{BitEmptyPosting}, kv.UserMeta) - require.Equal(t, ol.minTs, kv.Version) + require.Equal(t, ol.minTs+1, kv.Version) } } @@ -1156,21 +1159,24 @@ func TestMultiPartListDeleteAndAdd(t *testing.T) { key := x.DataKey(uuid.New().String(), 1331) ol, err := getNew(key, ps, math.MaxUint64) require.NoError(t, err) + var curTs uint64 for i := 1; i <= size; i++ { edge := &pb.DirectedEdge{ ValueId: uint64(i), } - txn := Txn{StartTs: uint64(i)} + txn := Txn{StartTs: uint64(curTs)} addMutationHelper(t, ol, edge, Set, &txn) - require.NoError(t, ol.commitMutation(uint64(i), uint64(i)+1)) + require.NoError(t, ol.commitMutation(curTs, curTs+1)) if i%2000 == 0 { + curTs++ kvs, err := ol.Rollup() require.NoError(t, err) require.NoError(t, writePostingListToDisk(kvs)) ol, err = getNew(key, ps, math.MaxUint64) require.NoError(t, err) } + curTs++ } // Verify all entries are in the list. @@ -1183,31 +1189,33 @@ func TestMultiPartListDeleteAndAdd(t *testing.T) { } // Delete the first half of the previously inserted entries from the list. - baseStartTs := uint64(size) + 1 for i := 1; i <= size/2; i++ { edge := &pb.DirectedEdge{ ValueId: uint64(i), } - txn := Txn{StartTs: baseStartTs + uint64(i)} + txn := Txn{StartTs: curTs} addMutationHelper(t, ol, edge, Del, &txn) - require.NoError(t, ol.commitMutation(baseStartTs+uint64(i), baseStartTs+uint64(i)+1)) + require.NoError(t, ol.commitMutation(curTs, curTs+1)) if i%2000 == 0 { + curTs++ kvs, err := ol.Rollup() require.NoError(t, err) require.NoError(t, writePostingListToDisk(kvs)) ol, err = getNew(key, ps, math.MaxUint64) require.NoError(t, err) } + curTs++ } // Rollup list at the end of all the deletions. + curTs++ kvs, err := ol.Rollup() require.NoError(t, err) require.NoError(t, writePostingListToDisk(kvs)) ol, err = getNew(key, ps, math.MaxUint64) require.NoError(t, err) for _, kv := range kvs { - require.Equal(t, baseStartTs+uint64(1+size/2), kv.Version) + require.Equal(t, curTs, kv.Version) } // Verify that the entries were actually deleted. opt = ListOptions{ReadTs: math.MaxUint64} @@ -1219,22 +1227,23 @@ func TestMultiPartListDeleteAndAdd(t *testing.T) { } // Re-add the entries that were just deleted. - baseStartTs = uint64(2*size) + 1 - for i := 1; i <= 50000; i++ { + for i := 1; i <= size/2; i++ { edge := &pb.DirectedEdge{ ValueId: uint64(i), } - txn := Txn{StartTs: baseStartTs + uint64(i)} + txn := Txn{StartTs: curTs} addMutationHelper(t, ol, edge, Set, &txn) - require.NoError(t, ol.commitMutation(baseStartTs+uint64(i), baseStartTs+uint64(i)+1)) + require.NoError(t, ol.commitMutation(curTs, curTs+1)) if i%2000 == 0 { + curTs++ kvs, err := ol.Rollup() require.NoError(t, err) require.NoError(t, writePostingListToDisk(kvs)) ol, err = getNew(key, ps, math.MaxUint64) require.NoError(t, err) } + curTs++ } // Rollup list at the end of all the additions @@ -1272,7 +1281,7 @@ func TestSingleListRollup(t *testing.T) { require.Equal(t, 0, len(plist.Splits)) var labels []string - err = ol.Iterate(uint64(size)+1, 0, func(p *pb.Posting) error { + err = ol.Iterate(math.MaxUint64, 0, func(p *pb.Posting) error { if len(p.Label) > 0 { labels = append(labels, p.Label) } @@ -1323,7 +1332,7 @@ func TestRecursiveSplits(t *testing.T) { // Read back the list and verify the data is correct. var labels []string - err = ol.Iterate(uint64(size)+1, 0, func(p *pb.Posting) error { + err = ol.Iterate(math.MaxUint64, 0, func(p *pb.Posting) error { if len(p.Label) > 0 { labels = append(labels, p.Label) } diff --git a/posting/mvcc_test.go b/posting/mvcc_test.go index cac28eca6b0..c59a3747c2a 100644 --- a/posting/mvcc_test.go +++ b/posting/mvcc_test.go @@ -58,7 +58,7 @@ func TestRollupTimestamp(t *testing.T) { // delete marker being the most recent update. kvs, err := nl.Rollup() require.NoError(t, err) - require.Equal(t, uint64(10), kvs[0].Version) + require.Equal(t, uint64(11), kvs[0].Version) } func TestPostingListRead(t *testing.T) {