Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(rollups): Write rolled-up keys at ts+1 (#7957) #7959

Merged
merged 1 commit into from
Jul 27, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 11 additions & 2 deletions posting/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -914,7 +914,16 @@ func (l *List) Rollup(alloc *z.Allocator) ([]*bpb.KV, error) {

var kvs []*bpb.KV
kv := MarshalPostingList(out.plist, alloc)
kv.Version = out.newMinTs
// We set kv.Version to newMinTs + 1 because if we write the rolled up keys at the same ts as
// that of the delta, then in case of wal replay the rolled up key would get over-written by the
// delta which can bring db to an invalid state.
// It would be fine to write rolled up key at ts+1 and this key won't be overwritten by any
// other delta because there cannot be commit at ts as well as ts+1 on the same key. The reason
// is as follows:
// Suppose there are two inter-leaved txns [s1 s2 c1 c2] where si, ci is the start and commit
// of the i'th txn. In this case c2 would not have happened because of conflict.
// Suppose there are two disjoint txns [s1 c1 s2 c2], then c1 and c2 cannot be consecutive.
kv.Version = out.newMinTs + 1
kv.Key = alloc.Copy(l.key)
kvs = append(kvs, kv)

Expand Down Expand Up @@ -997,7 +1006,7 @@ func (out *rollupOutput) marshalPostingListPart(alloc *z.Allocator,
hex.EncodeToString(baseKey), startUid)
}
kv := MarshalPostingList(plist, alloc)
kv.Version = out.newMinTs
kv.Version = out.newMinTs + 1
kv.Key = alloc.Copy(key)
return kv, nil
}
Expand Down
54 changes: 31 additions & 23 deletions posting/list_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -471,7 +471,7 @@ func TestMillion(t *testing.T) {
}

t.Logf("Completed a million writes.\n")
opt := ListOptions{ReadTs: uint64(N) + 1}
opt := ListOptions{ReadTs: math.MaxUint64}
bm, err := ol.Bitmap(opt)
require.NoError(t, err)
require.Equal(t, commits, bm.GetCardinality())
Expand Down Expand Up @@ -887,6 +887,7 @@ func createMultiPartList(t *testing.T, size int, addFacet bool) (*List, int) {
ol, err := getNew(key, ps, math.MaxUint64)
require.NoError(t, err)
commits := 0
curTs := 1
for i := 1; i <= size; i++ {
edge := &pb.DirectedEdge{
ValueId: uint64(i),
Expand All @@ -897,24 +898,25 @@ func createMultiPartList(t *testing.T, size int, addFacet bool) (*List, int) {
edge.Facets = []*api.Facet{{Key: strconv.Itoa(i)}}
}

txn := Txn{StartTs: uint64(i)}
txn := Txn{StartTs: uint64(curTs)}
addMutationHelper(t, ol, edge, Set, &txn)
require.NoError(t, ol.commitMutation(uint64(i), uint64(i)+1))
require.NoError(t, ol.commitMutation(uint64(curTs), uint64(curTs)+1))
if i%2000 == 0 {
t.Logf("Rolling up keys. i=%d\n", i)
curTs++
kvs, err := ol.Rollup(nil)
require.NoError(t, err)
require.NoError(t, writePostingListToDisk(kvs))
ol, err = getNew(key, ps, math.MaxUint64)
require.NoError(t, err)
}
commits++
curTs++
}

kvs, err := ol.Rollup(nil)
require.NoError(t, err)
for _, kv := range kvs {
require.Equal(t, uint64(size+1), kv.Version)
require.Equal(t, uint64(curTs+1), kv.Version)
}
require.NoError(t, writePostingListToDisk(kvs))
ol, err = getNew(key, ps, math.MaxUint64)
Expand Down Expand Up @@ -1075,7 +1077,7 @@ func TestMultiPartListBasic(t *testing.T) {
// size := int(1e5)
size := int(6000)
ol, commits := createMultiPartList(t, size, false)
opt := ListOptions{ReadTs: uint64(size) + 1}
opt := ListOptions{ReadTs: math.MaxUint64}
l, err := ol.Uids(opt)
require.NoError(t, err)
uids := codec.GetUids(l)
Expand Down Expand Up @@ -1117,7 +1119,7 @@ func TestBinSplit(t *testing.T) {
require.NoError(t, err)
t.Logf("Num KVs: %d\n", len(kvs))
for _, kv := range kvs {
require.Equal(t, uint64(size+1), kv.Version)
require.Equal(t, uint64(size+2), kv.Version)
}
require.NoError(t, writePostingListToDisk(kvs))
ol, err = getNew(key, ps, math.MaxUint64)
Expand Down Expand Up @@ -1210,7 +1212,7 @@ func TestMultiPartListIterAfterUid(t *testing.T) {

after := 2000
bm, err := ol.Bitmap(ListOptions{
ReadTs: uint64(size + 1),
ReadTs: math.MaxUint64,
AfterUid: uint64(after),
})
require.NoError(t, err)
Expand All @@ -1228,7 +1230,7 @@ func TestMultiPartListWithPostings(t *testing.T) {
ol, commits := createMultiPartList(t, size, true)

var facets []string
err := ol.Iterate(uint64(size)+1, 0, func(p *pb.Posting) error {
err := ol.Iterate(math.MaxUint64, 0, func(p *pb.Posting) error {
if len(p.Facets) > 0 {
facets = append(facets, p.Facets[0].Key)
}
Expand Down Expand Up @@ -1267,7 +1269,7 @@ func TestMultiPartListMarshal(t *testing.T) {
require.NoError(t, err)
require.Equal(t, data, kvs[i+1].Value)
require.Equal(t, []byte{BitCompletePosting}, kvs[i+1].UserMeta)
require.Equal(t, ol.minTs, kvs[i+1].Version)
require.Equal(t, ol.minTs+1, kvs[i+1].Version)
}
}

Expand All @@ -1286,7 +1288,7 @@ func TestMultiPartListWriteToDisk(t *testing.T) {
newList, err := getNew(kvs[0].Key, ps, math.MaxUint64)
require.NoError(t, err)

opt := ListOptions{ReadTs: uint64(size) + 1}
opt := ListOptions{ReadTs: math.MaxUint64}
originalUids, err := originalList.Uids(opt)
require.NoError(t, err)
newUids, err := newList.Uids(opt)
Expand Down Expand Up @@ -1321,7 +1323,7 @@ func TestMultiPartListDelete(t *testing.T) {

for _, kv := range kvs {
require.Equal(t, []byte{BitEmptyPosting}, kv.UserMeta)
require.Equal(t, ol.minTs, kv.Version)
require.Equal(t, ol.minTs+1, kv.Version)
}
}

Expand All @@ -1338,21 +1340,24 @@ func TestMultiPartListDeleteAndAdd(t *testing.T) {
key := x.DataKey(x.GalaxyAttr(uuid.New().String()), 1331)
ol, err := getNew(key, ps, math.MaxUint64)
require.NoError(t, err)
var curTs uint64
for i := 1; i <= size; i++ {
edge := &pb.DirectedEdge{
ValueId: uint64(i),
}

txn := Txn{StartTs: uint64(i)}
txn := Txn{StartTs: uint64(curTs)}
addMutationHelper(t, ol, edge, Set, &txn)
require.NoError(t, ol.commitMutation(uint64(i), uint64(i)+1))
require.NoError(t, ol.commitMutation(curTs, curTs+1))
if i%2000 == 0 {
curTs++
kvs, err := ol.Rollup(nil)
require.NoError(t, err)
require.NoError(t, writePostingListToDisk(kvs))
ol, err = getNew(key, ps, math.MaxUint64)
require.NoError(t, err)
}
curTs++
}

// Verify all entries are in the list.
Expand All @@ -1366,31 +1371,33 @@ func TestMultiPartListDeleteAndAdd(t *testing.T) {
}

// Delete the first half of the previously inserted entries from the list.
baseStartTs := uint64(size) + 1
for i := 1; i <= size/2; i++ {
edge := &pb.DirectedEdge{
ValueId: uint64(i),
}
txn := Txn{StartTs: baseStartTs + uint64(i)}
txn := Txn{StartTs: curTs}
addMutationHelper(t, ol, edge, Del, &txn)
require.NoError(t, ol.commitMutation(baseStartTs+uint64(i), baseStartTs+uint64(i)+1))
require.NoError(t, ol.commitMutation(curTs, curTs+1))
if i%2000 == 0 {
curTs++
kvs, err := ol.Rollup(nil)
require.NoError(t, err)
require.NoError(t, writePostingListToDisk(kvs))
ol, err = getNew(key, ps, math.MaxUint64)
require.NoError(t, err)
}
curTs++
}

// Rollup list at the end of all the deletions.
curTs++
kvs, err := ol.Rollup(nil)
require.NoError(t, err)
require.NoError(t, writePostingListToDisk(kvs))
ol, err = getNew(key, ps, math.MaxUint64)
require.NoError(t, err)
for _, kv := range kvs {
require.Equal(t, baseStartTs+uint64(1+size/2), kv.Version)
require.Equal(t, curTs, kv.Version)
}
// Verify that the entries were actually deleted.
opt = ListOptions{ReadTs: math.MaxUint64}
Expand All @@ -1403,22 +1410,23 @@ func TestMultiPartListDeleteAndAdd(t *testing.T) {
}

// Re-add the entries that were just deleted.
baseStartTs = uint64(2*size) + 1
for i := 1; i <= size/2; i++ {
edge := &pb.DirectedEdge{
ValueId: uint64(i),
}
txn := Txn{StartTs: baseStartTs + uint64(i)}
txn := Txn{StartTs: curTs}
addMutationHelper(t, ol, edge, Set, &txn)
require.NoError(t, ol.commitMutation(baseStartTs+uint64(i), baseStartTs+uint64(i)+1))
require.NoError(t, ol.commitMutation(curTs, curTs+1))

if i%2000 == 0 {
curTs++
kvs, err := ol.Rollup(nil)
require.NoError(t, err)
require.NoError(t, writePostingListToDisk(kvs))
ol, err = getNew(key, ps, math.MaxUint64)
require.NoError(t, err)
}
curTs++
}

// Rollup list at the end of all the additions
Expand Down Expand Up @@ -1447,7 +1455,7 @@ func TestSingleListRollup(t *testing.T) {
ol, commits := createMultiPartList(t, size, true)

var facets []string
err := ol.Iterate(uint64(size)+1, 0, func(p *pb.Posting) error {
err := ol.Iterate(math.MaxUint64, 0, func(p *pb.Posting) error {
if len(p.Facets) > 0 {
facets = append(facets, p.Facets[0].Key)
}
Expand Down Expand Up @@ -1508,7 +1516,7 @@ func TestRecursiveSplits(t *testing.T) {

// Read back the list and verify the data is correct.
var facets []string
err = ol.Iterate(uint64(size)+1, 0, func(p *pb.Posting) error {
err = ol.Iterate(math.MaxUint64, 0, func(p *pb.Posting) error {
if len(p.Facets) > 0 {
facets = append(facets, p.Facets[0].Key)
}
Expand Down
2 changes: 1 addition & 1 deletion posting/mvcc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func TestRollupTimestamp(t *testing.T) {
// delete marker being the most recent update.
kvs, err := nl.Rollup(nil)
require.NoError(t, err)
require.Equal(t, uint64(10), kvs[0].Version)
require.Equal(t, uint64(11), kvs[0].Version)
}

func TestPostingListRead(t *testing.T) {
Expand Down