Skip to content

Commit

Permalink
fix(rollups): Write rolled-up keys at ts+1 (#7957) (#7959)
Browse files Browse the repository at this point in the history
Write rolled up keys at (max ts of the deltas + 1) because if we write
the rolled-up keys at the same ts as that of the delta, then in case of
WAL replay the rolled-up key would get over-written by the delta which
can bring DB to an invalid state.

(cherry picked from commit 3831b49)
  • Loading branch information
ahsanbarkati committed Sep 3, 2021
1 parent b3e416e commit 4b6b97d
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 25 deletions.
15 changes: 13 additions & 2 deletions posting/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -822,12 +822,23 @@ func (l *List) Rollup() ([]*bpb.KV, error) {
}

var kvs []*bpb.KV

// We set kv.Version to newMinTs + 1 because if we write the rolled up keys at the same ts as
// that of the delta, then in case of wal replay the rolled up key would get over-written by the
// delta which can bring db to an invalid state.
// It would be fine to write rolled up key at ts+1 and this key won't be overwritten by any
// other delta because there cannot be commit at ts as well as ts+1 on the same key. The reason
// is as follows:
// Suppose there are two inter-leaved txns [s1 s2 c1 c2] where si, ci is the start and commit
// of the i'th txn. In this case c2 would not have happened because of conflict.
// Suppose there are two disjoint txns [s1 c1 s2 c2], then c1 and c2 cannot be consecutive.
kv := &bpb.KV{}
kv.Version = out.newMinTs
kv.Version = out.newMinTs + 1
kv.Key = l.key
val, meta := marshalPostingList(out.plist)
kv.UserMeta = []byte{meta}
kv.Value = val

kvs = append(kvs, kv)

for startUid, plist := range out.parts {
Expand Down Expand Up @@ -879,7 +890,7 @@ func (l *List) SingleListRollup(kv *bpb.KV) error {
func (out *rollupOutput) marshalPostingListPart(
baseKey []byte, startUid uint64, plist *pb.PostingList) (*bpb.KV, error) {
kv := &bpb.KV{}
kv.Version = out.newMinTs
kv.Version = out.newMinTs + 1
key, err := x.SplitKey(baseKey, startUid)
if err != nil {
return nil, errors.Wrapf(err,
Expand Down
53 changes: 31 additions & 22 deletions posting/list_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -479,7 +479,7 @@ func TestMillion(t *testing.T) {
}

t.Logf("Completed a million writes.\n")
opt := ListOptions{ReadTs: uint64(N) + 1}
opt := ListOptions{ReadTs: math.MaxUint64}
l, err := ol.Uids(opt)
require.NoError(t, err)
require.Equal(t, commits, len(l.Uids), "List of Uids received: %+v", l.Uids)
Expand Down Expand Up @@ -916,6 +916,7 @@ func createMultiPartList(t *testing.T, size int, addLabel bool) (*List, int) {
ol, err := getNew(key, ps, math.MaxUint64)
require.NoError(t, err)
commits := 0
curTs := 1
for i := 1; i <= size; i++ {
edge := &pb.DirectedEdge{
ValueId: uint64(i),
Expand All @@ -924,22 +925,24 @@ func createMultiPartList(t *testing.T, size int, addLabel bool) (*List, int) {
edge.Label = strconv.Itoa(i)
}

txn := Txn{StartTs: uint64(i)}
txn := Txn{StartTs: uint64(curTs)}
addMutationHelper(t, ol, edge, Set, &txn)
require.NoError(t, ol.commitMutation(uint64(i), uint64(i)+1))
require.NoError(t, ol.commitMutation(uint64(curTs), uint64(curTs)+1))
if i%2000 == 0 {
curTs++
kvs, err := ol.Rollup()
require.NoError(t, err)
require.NoError(t, writePostingListToDisk(kvs))
ol, err = getNew(key, ps, math.MaxUint64)
require.NoError(t, err)
}
commits++
curTs++
}

kvs, err := ol.Rollup()
for _, kv := range kvs {
require.Equal(t, uint64(size+1), kv.Version)
require.Equal(t, uint64(curTs+1), kv.Version)
}
require.NoError(t, err)
require.NoError(t, writePostingListToDisk(kvs))
Expand Down Expand Up @@ -1021,7 +1024,7 @@ func writePostingListToDisk(kvs []*bpb.KV) error {
func TestMultiPartListBasic(t *testing.T) {
size := int(1e5)
ol, commits := createMultiPartList(t, size, false)
opt := ListOptions{ReadTs: uint64(size) + 1}
opt := ListOptions{ReadTs: math.MaxUint64}
l, err := ol.Uids(opt)
require.NoError(t, err)
require.Equal(t, commits, len(l.Uids), "List of Uids received: %+v", l.Uids)
Expand All @@ -1036,7 +1039,7 @@ func TestMultiPartListIterAfterUid(t *testing.T) {
ol, _ := createMultiPartList(t, size, false)

var visitedUids []uint64
ol.Iterate(uint64(size+1), 50000, func(p *pb.Posting) error {
ol.Iterate(math.MaxUint64, 50000, func(p *pb.Posting) error {
visitedUids = append(visitedUids, p.Uid)
return nil
})
Expand All @@ -1052,7 +1055,7 @@ func TestMultiPartListWithPostings(t *testing.T) {
ol, commits := createMultiPartList(t, size, true)

var labels []string
err := ol.Iterate(uint64(size)+1, 0, func(p *pb.Posting) error {
err := ol.Iterate(math.MaxUint64, 0, func(p *pb.Posting) error {
if len(p.Label) > 0 {
labels = append(labels, p.Label)
}
Expand Down Expand Up @@ -1089,7 +1092,7 @@ func TestMultiPartListMarshal(t *testing.T) {
require.NoError(t, err)
require.Equal(t, data, kvs[i+1].Value)
require.Equal(t, []byte{BitCompletePosting}, kvs[i+1].UserMeta)
require.Equal(t, ol.minTs, kvs[i+1].Version)
require.Equal(t, ol.minTs+1, kvs[i+1].Version)
}
}

Expand All @@ -1106,7 +1109,7 @@ func TestMultiPartListWriteToDisk(t *testing.T) {
newList, err := getNew(kvs[0].Key, ps, math.MaxUint64)
require.NoError(t, err)

opt := ListOptions{ReadTs: uint64(size) + 1}
opt := ListOptions{ReadTs: math.MaxUint64}
originalUids, err := originalList.Uids(opt)
require.NoError(t, err)
newUids, err := newList.Uids(opt)
Expand Down Expand Up @@ -1137,7 +1140,7 @@ func TestMultiPartListDelete(t *testing.T) {

for _, kv := range kvs {
require.Equal(t, []byte{BitEmptyPosting}, kv.UserMeta)
require.Equal(t, ol.minTs, kv.Version)
require.Equal(t, ol.minTs+1, kv.Version)
}
}

Expand All @@ -1156,21 +1159,24 @@ func TestMultiPartListDeleteAndAdd(t *testing.T) {
key := x.DataKey(uuid.New().String(), 1331)
ol, err := getNew(key, ps, math.MaxUint64)
require.NoError(t, err)
var curTs uint64
for i := 1; i <= size; i++ {
edge := &pb.DirectedEdge{
ValueId: uint64(i),
}

txn := Txn{StartTs: uint64(i)}
txn := Txn{StartTs: uint64(curTs)}
addMutationHelper(t, ol, edge, Set, &txn)
require.NoError(t, ol.commitMutation(uint64(i), uint64(i)+1))
require.NoError(t, ol.commitMutation(curTs, curTs+1))
if i%2000 == 0 {
curTs++
kvs, err := ol.Rollup()
require.NoError(t, err)
require.NoError(t, writePostingListToDisk(kvs))
ol, err = getNew(key, ps, math.MaxUint64)
require.NoError(t, err)
}
curTs++
}

// Verify all entries are in the list.
Expand All @@ -1183,31 +1189,33 @@ func TestMultiPartListDeleteAndAdd(t *testing.T) {
}

// Delete the first half of the previously inserted entries from the list.
baseStartTs := uint64(size) + 1
for i := 1; i <= size/2; i++ {
edge := &pb.DirectedEdge{
ValueId: uint64(i),
}
txn := Txn{StartTs: baseStartTs + uint64(i)}
txn := Txn{StartTs: curTs}
addMutationHelper(t, ol, edge, Del, &txn)
require.NoError(t, ol.commitMutation(baseStartTs+uint64(i), baseStartTs+uint64(i)+1))
require.NoError(t, ol.commitMutation(curTs, curTs+1))
if i%2000 == 0 {
curTs++
kvs, err := ol.Rollup()
require.NoError(t, err)
require.NoError(t, writePostingListToDisk(kvs))
ol, err = getNew(key, ps, math.MaxUint64)
require.NoError(t, err)
}
curTs++
}

// Rollup list at the end of all the deletions.
curTs++
kvs, err := ol.Rollup()
require.NoError(t, err)
require.NoError(t, writePostingListToDisk(kvs))
ol, err = getNew(key, ps, math.MaxUint64)
require.NoError(t, err)
for _, kv := range kvs {
require.Equal(t, baseStartTs+uint64(1+size/2), kv.Version)
require.Equal(t, curTs, kv.Version)
}
// Verify that the entries were actually deleted.
opt = ListOptions{ReadTs: math.MaxUint64}
Expand All @@ -1219,22 +1227,23 @@ func TestMultiPartListDeleteAndAdd(t *testing.T) {
}

// Re-add the entries that were just deleted.
baseStartTs = uint64(2*size) + 1
for i := 1; i <= 50000; i++ {
for i := 1; i <= size/2; i++ {
edge := &pb.DirectedEdge{
ValueId: uint64(i),
}
txn := Txn{StartTs: baseStartTs + uint64(i)}
txn := Txn{StartTs: curTs}
addMutationHelper(t, ol, edge, Set, &txn)
require.NoError(t, ol.commitMutation(baseStartTs+uint64(i), baseStartTs+uint64(i)+1))
require.NoError(t, ol.commitMutation(curTs, curTs+1))

if i%2000 == 0 {
curTs++
kvs, err := ol.Rollup()
require.NoError(t, err)
require.NoError(t, writePostingListToDisk(kvs))
ol, err = getNew(key, ps, math.MaxUint64)
require.NoError(t, err)
}
curTs++
}

// Rollup list at the end of all the additions
Expand Down Expand Up @@ -1272,7 +1281,7 @@ func TestSingleListRollup(t *testing.T) {
require.Equal(t, 0, len(plist.Splits))

var labels []string
err = ol.Iterate(uint64(size)+1, 0, func(p *pb.Posting) error {
err = ol.Iterate(math.MaxUint64, 0, func(p *pb.Posting) error {
if len(p.Label) > 0 {
labels = append(labels, p.Label)
}
Expand Down Expand Up @@ -1323,7 +1332,7 @@ func TestRecursiveSplits(t *testing.T) {

// Read back the list and verify the data is correct.
var labels []string
err = ol.Iterate(uint64(size)+1, 0, func(p *pb.Posting) error {
err = ol.Iterate(math.MaxUint64, 0, func(p *pb.Posting) error {
if len(p.Label) > 0 {
labels = append(labels, p.Label)
}
Expand Down
2 changes: 1 addition & 1 deletion posting/mvcc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func TestRollupTimestamp(t *testing.T) {
// delete marker being the most recent update.
kvs, err := nl.Rollup()
require.NoError(t, err)
require.Equal(t, uint64(10), kvs[0].Version)
require.Equal(t, uint64(11), kvs[0].Version)
}

func TestPostingListRead(t *testing.T) {
Expand Down

0 comments on commit 4b6b97d

Please sign in to comment.