diff --git a/posting/list.go b/posting/list.go index 73b12544f1d..e085f52d6be 100644 --- a/posting/list.go +++ b/posting/list.go @@ -914,7 +914,16 @@ func (l *List) Rollup(alloc *z.Allocator) ([]*bpb.KV, error) { var kvs []*bpb.KV kv := MarshalPostingList(out.plist, alloc) - kv.Version = out.newMinTs + // We set kv.Version to newMinTs + 1 because if we write the rolled up keys at the same ts as + // that of the delta, then in case of wal replay the rolled up key would get over-written by the + // delta which can bring db to an invalid state. + // It would be fine to write rolled up key at ts+1 and this key won't be overwritten by any + // other delta because there cannot be commit at ts as well as ts+1 on the same key. The reason + // is as follows: + // Suppose there are two inter-leaved txns [s1 s2 c1 c2] where si, ci is the start and commit + // of the i'th txn. In this case c2 would not have happened because of conflict. + // Suppose there are two disjoint txns [s1 c1 s2 c2], then c1 and c2 cannot be consecutive. + kv.Version = out.newMinTs + 1 kv.Key = alloc.Copy(l.key) kvs = append(kvs, kv) @@ -997,7 +1006,7 @@ func (out *rollupOutput) marshalPostingListPart(alloc *z.Allocator, hex.EncodeToString(baseKey), startUid) } kv := MarshalPostingList(plist, alloc) - kv.Version = out.newMinTs + kv.Version = out.newMinTs + 1 kv.Key = alloc.Copy(key) return kv, nil } diff --git a/posting/list_test.go b/posting/list_test.go index 3acaba0c452..d88956898d6 100644 --- a/posting/list_test.go +++ b/posting/list_test.go @@ -471,7 +471,7 @@ func TestMillion(t *testing.T) { } t.Logf("Completed a million writes.\n") - opt := ListOptions{ReadTs: uint64(N) + 1} + opt := ListOptions{ReadTs: math.MaxUint64} bm, err := ol.Bitmap(opt) require.NoError(t, err) require.Equal(t, commits, bm.GetCardinality()) @@ -887,6 +887,7 @@ func createMultiPartList(t *testing.T, size int, addFacet bool) (*List, int) { ol, err := getNew(key, ps, math.MaxUint64) require.NoError(t, err) commits := 0 + curTs := 1 for i := 1; i <= size; i++ { edge := &pb.DirectedEdge{ ValueId: uint64(i), @@ -897,11 +898,11 @@ func createMultiPartList(t *testing.T, size int, addFacet bool) (*List, int) { edge.Facets = []*api.Facet{{Key: strconv.Itoa(i)}} } - txn := Txn{StartTs: uint64(i)} + txn := Txn{StartTs: uint64(curTs)} addMutationHelper(t, ol, edge, Set, &txn) - require.NoError(t, ol.commitMutation(uint64(i), uint64(i)+1)) + require.NoError(t, ol.commitMutation(uint64(curTs), uint64(curTs)+1)) if i%2000 == 0 { - t.Logf("Rolling up keys. i=%d\n", i) + curTs++ kvs, err := ol.Rollup(nil) require.NoError(t, err) require.NoError(t, writePostingListToDisk(kvs)) @@ -909,12 +910,13 @@ func createMultiPartList(t *testing.T, size int, addFacet bool) (*List, int) { require.NoError(t, err) } commits++ + curTs++ } kvs, err := ol.Rollup(nil) require.NoError(t, err) for _, kv := range kvs { - require.Equal(t, uint64(size+1), kv.Version) + require.Equal(t, uint64(curTs+1), kv.Version) } require.NoError(t, writePostingListToDisk(kvs)) ol, err = getNew(key, ps, math.MaxUint64) @@ -1075,7 +1077,7 @@ func TestMultiPartListBasic(t *testing.T) { // size := int(1e5) size := int(6000) ol, commits := createMultiPartList(t, size, false) - opt := ListOptions{ReadTs: uint64(size) + 1} + opt := ListOptions{ReadTs: math.MaxUint64} l, err := ol.Uids(opt) require.NoError(t, err) uids := codec.GetUids(l) @@ -1117,7 +1119,7 @@ func TestBinSplit(t *testing.T) { require.NoError(t, err) t.Logf("Num KVs: %d\n", len(kvs)) for _, kv := range kvs { - require.Equal(t, uint64(size+1), kv.Version) + require.Equal(t, uint64(size+2), kv.Version) } require.NoError(t, writePostingListToDisk(kvs)) ol, err = getNew(key, ps, math.MaxUint64) @@ -1210,7 +1212,7 @@ func TestMultiPartListIterAfterUid(t *testing.T) { after := 2000 bm, err := ol.Bitmap(ListOptions{ - ReadTs: uint64(size + 1), + ReadTs: math.MaxUint64, AfterUid: uint64(after), }) require.NoError(t, err) @@ -1228,7 +1230,7 @@ func TestMultiPartListWithPostings(t *testing.T) { ol, commits := createMultiPartList(t, size, true) var facets []string - err := ol.Iterate(uint64(size)+1, 0, func(p *pb.Posting) error { + err := ol.Iterate(math.MaxUint64, 0, func(p *pb.Posting) error { if len(p.Facets) > 0 { facets = append(facets, p.Facets[0].Key) } @@ -1267,7 +1269,7 @@ func TestMultiPartListMarshal(t *testing.T) { require.NoError(t, err) require.Equal(t, data, kvs[i+1].Value) require.Equal(t, []byte{BitCompletePosting}, kvs[i+1].UserMeta) - require.Equal(t, ol.minTs, kvs[i+1].Version) + require.Equal(t, ol.minTs+1, kvs[i+1].Version) } } @@ -1286,7 +1288,7 @@ func TestMultiPartListWriteToDisk(t *testing.T) { newList, err := getNew(kvs[0].Key, ps, math.MaxUint64) require.NoError(t, err) - opt := ListOptions{ReadTs: uint64(size) + 1} + opt := ListOptions{ReadTs: math.MaxUint64} originalUids, err := originalList.Uids(opt) require.NoError(t, err) newUids, err := newList.Uids(opt) @@ -1321,7 +1323,7 @@ func TestMultiPartListDelete(t *testing.T) { for _, kv := range kvs { require.Equal(t, []byte{BitEmptyPosting}, kv.UserMeta) - require.Equal(t, ol.minTs, kv.Version) + require.Equal(t, ol.minTs+1, kv.Version) } } @@ -1338,21 +1340,24 @@ func TestMultiPartListDeleteAndAdd(t *testing.T) { key := x.DataKey(x.GalaxyAttr(uuid.New().String()), 1331) ol, err := getNew(key, ps, math.MaxUint64) require.NoError(t, err) + var curTs uint64 for i := 1; i <= size; i++ { edge := &pb.DirectedEdge{ ValueId: uint64(i), } - txn := Txn{StartTs: uint64(i)} + txn := Txn{StartTs: uint64(curTs)} addMutationHelper(t, ol, edge, Set, &txn) - require.NoError(t, ol.commitMutation(uint64(i), uint64(i)+1)) + require.NoError(t, ol.commitMutation(curTs, curTs+1)) if i%2000 == 0 { + curTs++ kvs, err := ol.Rollup(nil) require.NoError(t, err) require.NoError(t, writePostingListToDisk(kvs)) ol, err = getNew(key, ps, math.MaxUint64) require.NoError(t, err) } + curTs++ } // Verify all entries are in the list. @@ -1366,31 +1371,33 @@ func TestMultiPartListDeleteAndAdd(t *testing.T) { } // Delete the first half of the previously inserted entries from the list. - baseStartTs := uint64(size) + 1 for i := 1; i <= size/2; i++ { edge := &pb.DirectedEdge{ ValueId: uint64(i), } - txn := Txn{StartTs: baseStartTs + uint64(i)} + txn := Txn{StartTs: curTs} addMutationHelper(t, ol, edge, Del, &txn) - require.NoError(t, ol.commitMutation(baseStartTs+uint64(i), baseStartTs+uint64(i)+1)) + require.NoError(t, ol.commitMutation(curTs, curTs+1)) if i%2000 == 0 { + curTs++ kvs, err := ol.Rollup(nil) require.NoError(t, err) require.NoError(t, writePostingListToDisk(kvs)) ol, err = getNew(key, ps, math.MaxUint64) require.NoError(t, err) } + curTs++ } // Rollup list at the end of all the deletions. + curTs++ kvs, err := ol.Rollup(nil) require.NoError(t, err) require.NoError(t, writePostingListToDisk(kvs)) ol, err = getNew(key, ps, math.MaxUint64) require.NoError(t, err) for _, kv := range kvs { - require.Equal(t, baseStartTs+uint64(1+size/2), kv.Version) + require.Equal(t, curTs, kv.Version) } // Verify that the entries were actually deleted. opt = ListOptions{ReadTs: math.MaxUint64} @@ -1403,22 +1410,23 @@ func TestMultiPartListDeleteAndAdd(t *testing.T) { } // Re-add the entries that were just deleted. - baseStartTs = uint64(2*size) + 1 for i := 1; i <= size/2; i++ { edge := &pb.DirectedEdge{ ValueId: uint64(i), } - txn := Txn{StartTs: baseStartTs + uint64(i)} + txn := Txn{StartTs: curTs} addMutationHelper(t, ol, edge, Set, &txn) - require.NoError(t, ol.commitMutation(baseStartTs+uint64(i), baseStartTs+uint64(i)+1)) + require.NoError(t, ol.commitMutation(curTs, curTs+1)) if i%2000 == 0 { + curTs++ kvs, err := ol.Rollup(nil) require.NoError(t, err) require.NoError(t, writePostingListToDisk(kvs)) ol, err = getNew(key, ps, math.MaxUint64) require.NoError(t, err) } + curTs++ } // Rollup list at the end of all the additions @@ -1447,7 +1455,7 @@ func TestSingleListRollup(t *testing.T) { ol, commits := createMultiPartList(t, size, true) var facets []string - err := ol.Iterate(uint64(size)+1, 0, func(p *pb.Posting) error { + err := ol.Iterate(math.MaxUint64, 0, func(p *pb.Posting) error { if len(p.Facets) > 0 { facets = append(facets, p.Facets[0].Key) } @@ -1508,7 +1516,7 @@ func TestRecursiveSplits(t *testing.T) { // Read back the list and verify the data is correct. var facets []string - err = ol.Iterate(uint64(size)+1, 0, func(p *pb.Posting) error { + err = ol.Iterate(math.MaxUint64, 0, func(p *pb.Posting) error { if len(p.Facets) > 0 { facets = append(facets, p.Facets[0].Key) } diff --git a/posting/mvcc_test.go b/posting/mvcc_test.go index 622613dd30e..5d97d60476a 100644 --- a/posting/mvcc_test.go +++ b/posting/mvcc_test.go @@ -60,7 +60,7 @@ func TestRollupTimestamp(t *testing.T) { // delete marker being the most recent update. kvs, err := nl.Rollup(nil) require.NoError(t, err) - require.Equal(t, uint64(10), kvs[0].Version) + require.Equal(t, uint64(11), kvs[0].Version) } func TestPostingListRead(t *testing.T) {