Skip to content

Commit

Permalink
temp
Browse files Browse the repository at this point in the history
  • Loading branch information
harshil-goel committed Oct 21, 2024
1 parent fccb46e commit 6fb5e6d
Show file tree
Hide file tree
Showing 7 changed files with 251 additions and 87 deletions.
25 changes: 21 additions & 4 deletions posting/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,11 +217,28 @@ func (txn *Txn) addIndexMutation(ctx context.Context, edge *pb.DirectedEdge, tok
return err
}

x.AssertTrue(plist != nil)
if err = plist.addMutation(ctx, txn, edge); err != nil {
return err
//if err = plist.addMutation(ctx, txn, edge); err != nil {
// return err
//}

mpost := NewPosting(edge)
mpost.StartTs = txn.StartTs
if mpost.PostingType != pb.Posting_REF {
edge.ValueId = fingerprintEdge(edge)
mpost.Uid = edge.ValueId
}
ostats.Record(ctx, x.NumEdges.M(1))

//fmt.Println("ADDING MUTATION", plist.mutationMap, key, edge)
txn.addConflictKey(indexConflicKey(key, edge))

plist.Lock()
defer plist.Unlock()
if err != plist.updateMutationLayer(mpost, false) {
return errors.Wrapf(err, "cannot update mutation layer of key %s with value %+v",
hex.EncodeToString(plist.key), mpost)
}

//ostats.Record(ctx, x.NumEdges.M(1))
return nil
}

Expand Down
45 changes: 39 additions & 6 deletions posting/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"bytes"
"context"
"encoding/hex"
"fmt"
"log"
"math"
"sort"
Expand Down Expand Up @@ -96,6 +97,7 @@ type MutableMap struct {
deleteMarker uint64
uidMap map[uint64]int
uidsH map[uint64]*pb.Posting
uidsHTime uint64
length int
}

Expand All @@ -106,6 +108,7 @@ func newMutableMap() *MutableMap {
deleteMarker: math.MaxUint64,
length: math.MaxInt,
uidsH: make(map[uint64]*pb.Posting),
uidsHTime: math.MaxUint64,
}
}

Expand All @@ -119,6 +122,7 @@ func (mm *MutableMap) clone() *MutableMap {
deleteMarker: mm.deleteMarker,
uidsH: mm.uidsH,
length: mm.length,
uidsHTime: mm.uidsHTime,
}
}

Expand Down Expand Up @@ -168,9 +172,10 @@ func (mm *MutableMap) listLen(readTs uint64) int {
return 0
}

if mm.length == math.MaxInt {
if mm.length == math.MaxInt || readTs < mm.uidsHTime {
count := 0
mm.iterate(func(ts uint64, pl *pb.PostingList) {
//fmt.Println(ts, pl)
for _, mpost := range pl.Postings {
if mpost.Op == Del {
count -= 1
Expand All @@ -182,9 +187,11 @@ func (mm *MutableMap) listLen(readTs uint64) int {
return count
}

//fmt.Println("here", mm.length)
count := mm.length
if mm.curList != nil {
for _, mpost := range mm.curList.Postings {
fmt.Println(mpost)
if mpost.Op == Del {
count -= 1
} else {
Expand Down Expand Up @@ -250,7 +257,8 @@ func (mm *MutableMap) iterate(f func(ts uint64, pl *pb.PostingList), readTs uint

deleteMarker := mm.populateDeleteAll(readTs)
mm._iterate(func(ts uint64, pl *pb.PostingList) {
if ts >= deleteMarker {
//fmt.Println("********MUTABLE MAP ITERATE", ts, readTs, pl)
if ts >= deleteMarker && ts <= readTs {
f(ts, pl)
}
})
Expand All @@ -269,6 +277,10 @@ func (mm *MutableMap) insertOldPosting(pl *pb.PostingList) {
if _, ok := mm.uidsH[mpost.Uid]; !ok {
mm.uidsH[mpost.Uid] = mpost
}
mm.uidsHTime = x.Max(mpost.CommitTs, mm.uidsHTime)
if mm.length == math.MaxInt64 {
mm.length = 0
}
if mpost.Op == Del {
mm.length -= 1
} else {
Expand Down Expand Up @@ -317,14 +329,14 @@ func (mm *MutableMap) findPosting(readTs, uid uint64) (bool, *pb.Posting) {
}

getPos := func() *pb.Posting {
pos, ok := mm.uidsH[uid]
if ok {
return pos
}
posI, ok := mm.uidMap[uid]
if ok {
return mm.curList.Postings[posI]
}
pos, ok := mm.uidsH[uid]
if ok {
return pos
}
return nil
}

Expand Down Expand Up @@ -704,6 +716,17 @@ func (l *List) addMutation(ctx context.Context, txn *Txn, t *pb.DirectedEdge) er
return l.addMutationInternal(ctx, txn, t)
}

func indexConflicKey(key []byte, t *pb.DirectedEdge) uint64 {
getKey := func(key []byte, uid uint64) uint64 {
// Instead of creating a string first and then doing a fingerprint, let's do a fingerprint
// here to save memory allocations.
// Not entirely sure about effect on collision chances due to this simple XOR with uid.
return farm.Fingerprint64(key) ^ uid
}

return getKey(key, t.ValueId)
}

func GetConflictKey(pk x.ParsedKey, key []byte, t *pb.DirectedEdge) uint64 {
getKey := func(key []byte, uid uint64) uint64 {
// Instead of creating a string first and then doing a fingerprint, let's do a fingerprint
Expand Down Expand Up @@ -846,6 +869,7 @@ func (l *List) setMutationAfterCommit(startTs, commitTs uint64, pl *pb.PostingLi
}

l.mutationMap.uidsH[mpost.Uid] = mpost
l.mutationMap.uidsHTime = x.Max(l.mutationMap.uidsHTime, commitTs)
if mpost.Op == Del {
l.mutationMap.length -= 1
} else {
Expand Down Expand Up @@ -942,6 +966,7 @@ func (l *List) iterate(readTs uint64, afterUid uint64, f func(obj *pb.Posting) e

// mposts is the list of mutable postings
deleteBelowTs, mposts := l.pickPostings(readTs)
//fmt.Println(mposts, deleteBelowTs, l.plist)
if readTs < l.minTs {
return errors.Errorf("readTs: %d less than minTs: %d for key: %q", readTs, l.minTs, l.key)
}
Expand Down Expand Up @@ -1243,6 +1268,7 @@ func (l *List) Rollup(alloc *z.Allocator, readTs uint64) ([]*bpb.KV, error) {
return bytes.Compare(kvs[i].Key, kvs[j].Key) <= 0
})

//fmt.Println("ROLLING UP", l.key, out.plist, kv.Version)
x.PrintRollup(out.plist, out.parts, l.key, kv.Version)
x.VerifyPostingSplits(kvs, out.plist, out.parts, l.key)
return kvs, nil
Expand Down Expand Up @@ -1662,19 +1688,23 @@ func (l *List) findStaticValue(readTs uint64) *pb.PostingList {
if l.mutationMap == nil {
// If mutation map is empty, check if there is some data, and return it.
if l.plist != nil && len(l.plist.Postings) > 0 {
//fmt.Println("nil map plist")
return l.plist
}
//fmt.Println("nil map nil")
return nil
}

// Return readTs is if it's present in the mutation. It's going to be the latest value.
if l.mutationMap.curList != nil && l.mutationMap.curTime == readTs {
//fmt.Println("curlist", l.mutationMap.curList)
return l.mutationMap.curList
}

// If maxTs < readTs then we need to read maxTs
if l.maxTs <= readTs {
if mutation := l.mutationMap.get(l.maxTs); mutation != nil {
//fmt.Println("mutation", mutation)
return mutation
}
}
Expand All @@ -1690,11 +1720,13 @@ func (l *List) findStaticValue(readTs uint64) *pb.PostingList {
}
}, readTs)
if mutation != nil {
//fmt.Println("iterate", mutation)
return mutation
}

// If we reach here, that means that there was no entry in mutation map which is less than readTs. That
// means we need to return l.plist
//fmt.Println("got nothing", l.plist, l.plist != nil)
return l.plist
}

Expand Down Expand Up @@ -1856,6 +1888,7 @@ func (l *List) findPosting(readTs uint64, uid uint64) (found bool, pos *pb.Posti
// Iterate starts iterating after the given argument, so we pass UID - 1
// TODO Find what happens when uid = math.MaxUint64
searchFurther, pos := l.mutationMap.findPosting(readTs, uid)
//fmt.Println("FIND POSTING", readTs, "key", l.key, "mutationMap:", l.mutationMap, "plist:", l.plist, uid, searchFurther, pos)
if pos != nil {
return true, pos, nil
}
Expand Down
10 changes: 7 additions & 3 deletions posting/list_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ func TestGetSinglePosting(t *testing.T) {

res, err := l.StaticValue(1)
require.NoError(t, err)
//fmt.Println(res, res == nil)
require.Equal(t, res == nil, true)

l.plist = create_pl(1, 1)
Expand Down Expand Up @@ -225,6 +226,7 @@ func TestAddMutation(t *testing.T) {

func getFirst(t *testing.T, l *List, readTs uint64) (res pb.Posting) {
require.NoError(t, l.Iterate(readTs, 0, func(p *pb.Posting) error {
//fmt.Println("INSIDE ITERATE", p)
res = *p
return ErrStopIteration
}))
Expand All @@ -233,6 +235,7 @@ func getFirst(t *testing.T, l *List, readTs uint64) (res pb.Posting) {

func checkValue(t *testing.T, ol *List, val string, readTs uint64) {
p := getFirst(t, ol, readTs)
//fmt.Println("HERE", val, string(p.Value), p, ol, p.Uid)
require.Equal(t, uint64(math.MaxUint64), p.Uid) // Cast to prevent overflow.
require.EqualValues(t, val, p.Value)
}
Expand Down Expand Up @@ -522,7 +525,7 @@ func TestReadSingleValue(t *testing.T) {
require.NoError(t, ol.commitMutation(i, i+1))
kData := ol.getMutation(i + 1)
writer := NewTxnWriter(pstore)
if err := writer.SetAt(key, kData, BitDeltaPosting, i); err != nil {
if err := writer.SetAt(key, kData, BitDeltaPosting, i+1); err != nil {
require.NoError(t, err)
}
writer.Flush()
Expand All @@ -533,16 +536,17 @@ func TestReadSingleValue(t *testing.T) {
require.NoError(t, err)
require.NoError(t, writePostingListToDisk(kvs))
// Delete item from global cache before reading, as we are not updating the cache in the test
globalCache.Del(z.MemHash(key))
memoryLayer.Del(z.MemHash(key))
ol, err = getNew(key, ps, math.MaxUint64)
require.NoError(t, err)
}

j := uint64(2)
j := uint64(3)
if j < ol.minTs {
j = ol.minTs
}
for ; j < i+6; j++ {
ResetCache()
tx := NewTxn(j)
k, err := tx.cache.GetSinglePosting(key)
require.NoError(t, err)
Expand Down
3 changes: 3 additions & 0 deletions posting/lists.go
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,7 @@ func (lc *LocalCache) GetSinglePosting(key []byte) (*pb.PostingList, error) {

pl := &pb.PostingList{}
if delta, ok := lc.deltas[string(key)]; ok && len(delta) > 0 {
//fmt.Println("GETTING FROM DELTAS")
err := pl.Unmarshal(delta)
lc.RUnlock()
return pl, err
Expand All @@ -357,6 +358,7 @@ func (lc *LocalCache) GetSinglePosting(key []byte) (*pb.PostingList, error) {
// If both pl and err are empty, that means that there was no data in local cache, hence we should
// read the data from badger.
if pl != nil || err != nil {
//fmt.Println("GETTING POSTING1", lc.startTs, pl)
return pl, err
}

Expand All @@ -373,6 +375,7 @@ func (lc *LocalCache) GetSinglePosting(key []byte) (*pb.PostingList, error) {
return pl.Unmarshal(val)
})

//fmt.Println("GETTING POSTING FROM BADGER", lc.startTs, pl)
return pl, err
}

Expand Down
Loading

0 comments on commit 6fb5e6d

Please sign in to comment.