diff --git a/posting/index.go b/posting/index.go index 6ab2dbd91d8..9ee778c0008 100644 --- a/posting/index.go +++ b/posting/index.go @@ -217,11 +217,28 @@ func (txn *Txn) addIndexMutation(ctx context.Context, edge *pb.DirectedEdge, tok return err } - x.AssertTrue(plist != nil) - if err = plist.addMutation(ctx, txn, edge); err != nil { - return err + //if err = plist.addMutation(ctx, txn, edge); err != nil { + // return err + //} + + mpost := NewPosting(edge) + mpost.StartTs = txn.StartTs + if mpost.PostingType != pb.Posting_REF { + edge.ValueId = fingerprintEdge(edge) + mpost.Uid = edge.ValueId } - ostats.Record(ctx, x.NumEdges.M(1)) + + //fmt.Println("ADDING MUTATION", plist.mutationMap, key, edge) + txn.addConflictKey(indexConflicKey(key, edge)) + + plist.Lock() + defer plist.Unlock() + if err != plist.updateMutationLayer(mpost, false) { + return errors.Wrapf(err, "cannot update mutation layer of key %s with value %+v", + hex.EncodeToString(plist.key), mpost) + } + + //ostats.Record(ctx, x.NumEdges.M(1)) return nil } diff --git a/posting/list.go b/posting/list.go index 5e51aad022d..27b6c7848cf 100644 --- a/posting/list.go +++ b/posting/list.go @@ -20,6 +20,7 @@ import ( "bytes" "context" "encoding/hex" + "fmt" "log" "math" "sort" @@ -96,6 +97,7 @@ type MutableMap struct { deleteMarker uint64 uidMap map[uint64]int uidsH map[uint64]*pb.Posting + uidsHTime uint64 length int } @@ -106,6 +108,7 @@ func newMutableMap() *MutableMap { deleteMarker: math.MaxUint64, length: math.MaxInt, uidsH: make(map[uint64]*pb.Posting), + uidsHTime: math.MaxUint64, } } @@ -119,6 +122,7 @@ func (mm *MutableMap) clone() *MutableMap { deleteMarker: mm.deleteMarker, uidsH: mm.uidsH, length: mm.length, + uidsHTime: mm.uidsHTime, } } @@ -168,9 +172,10 @@ func (mm *MutableMap) listLen(readTs uint64) int { return 0 } - if mm.length == math.MaxInt { + if mm.length == math.MaxInt || readTs < mm.uidsHTime { count := 0 mm.iterate(func(ts uint64, pl *pb.PostingList) { + //fmt.Println(ts, pl) for _, mpost := range pl.Postings { if mpost.Op == Del { count -= 1 @@ -182,9 +187,11 @@ func (mm *MutableMap) listLen(readTs uint64) int { return count } + //fmt.Println("here", mm.length) count := mm.length if mm.curList != nil { for _, mpost := range mm.curList.Postings { + fmt.Println(mpost) if mpost.Op == Del { count -= 1 } else { @@ -250,7 +257,8 @@ func (mm *MutableMap) iterate(f func(ts uint64, pl *pb.PostingList), readTs uint deleteMarker := mm.populateDeleteAll(readTs) mm._iterate(func(ts uint64, pl *pb.PostingList) { - if ts >= deleteMarker { + //fmt.Println("********MUTABLE MAP ITERATE", ts, readTs, pl) + if ts >= deleteMarker && ts <= readTs { f(ts, pl) } }) @@ -269,6 +277,10 @@ func (mm *MutableMap) insertOldPosting(pl *pb.PostingList) { if _, ok := mm.uidsH[mpost.Uid]; !ok { mm.uidsH[mpost.Uid] = mpost } + mm.uidsHTime = x.Max(mpost.CommitTs, mm.uidsHTime) + if mm.length == math.MaxInt64 { + mm.length = 0 + } if mpost.Op == Del { mm.length -= 1 } else { @@ -317,14 +329,14 @@ func (mm *MutableMap) findPosting(readTs, uid uint64) (bool, *pb.Posting) { } getPos := func() *pb.Posting { - pos, ok := mm.uidsH[uid] - if ok { - return pos - } posI, ok := mm.uidMap[uid] if ok { return mm.curList.Postings[posI] } + pos, ok := mm.uidsH[uid] + if ok { + return pos + } return nil } @@ -704,6 +716,17 @@ func (l *List) addMutation(ctx context.Context, txn *Txn, t *pb.DirectedEdge) er return l.addMutationInternal(ctx, txn, t) } +func indexConflicKey(key []byte, t *pb.DirectedEdge) uint64 { + getKey := func(key []byte, uid uint64) uint64 { + // Instead of creating a string first and then doing a fingerprint, let's do a fingerprint + // here to save memory allocations. + // Not entirely sure about effect on collision chances due to this simple XOR with uid. + return farm.Fingerprint64(key) ^ uid + } + + return getKey(key, t.ValueId) +} + func GetConflictKey(pk x.ParsedKey, key []byte, t *pb.DirectedEdge) uint64 { getKey := func(key []byte, uid uint64) uint64 { // Instead of creating a string first and then doing a fingerprint, let's do a fingerprint @@ -846,6 +869,7 @@ func (l *List) setMutationAfterCommit(startTs, commitTs uint64, pl *pb.PostingLi } l.mutationMap.uidsH[mpost.Uid] = mpost + l.mutationMap.uidsHTime = x.Max(l.mutationMap.uidsHTime, commitTs) if mpost.Op == Del { l.mutationMap.length -= 1 } else { @@ -942,6 +966,7 @@ func (l *List) iterate(readTs uint64, afterUid uint64, f func(obj *pb.Posting) e // mposts is the list of mutable postings deleteBelowTs, mposts := l.pickPostings(readTs) + //fmt.Println(mposts, deleteBelowTs, l.plist) if readTs < l.minTs { return errors.Errorf("readTs: %d less than minTs: %d for key: %q", readTs, l.minTs, l.key) } @@ -1243,6 +1268,7 @@ func (l *List) Rollup(alloc *z.Allocator, readTs uint64) ([]*bpb.KV, error) { return bytes.Compare(kvs[i].Key, kvs[j].Key) <= 0 }) + //fmt.Println("ROLLING UP", l.key, out.plist, kv.Version) x.PrintRollup(out.plist, out.parts, l.key, kv.Version) x.VerifyPostingSplits(kvs, out.plist, out.parts, l.key) return kvs, nil @@ -1662,19 +1688,23 @@ func (l *List) findStaticValue(readTs uint64) *pb.PostingList { if l.mutationMap == nil { // If mutation map is empty, check if there is some data, and return it. if l.plist != nil && len(l.plist.Postings) > 0 { + //fmt.Println("nil map plist") return l.plist } + //fmt.Println("nil map nil") return nil } // Return readTs is if it's present in the mutation. It's going to be the latest value. if l.mutationMap.curList != nil && l.mutationMap.curTime == readTs { + //fmt.Println("curlist", l.mutationMap.curList) return l.mutationMap.curList } // If maxTs < readTs then we need to read maxTs if l.maxTs <= readTs { if mutation := l.mutationMap.get(l.maxTs); mutation != nil { + //fmt.Println("mutation", mutation) return mutation } } @@ -1690,11 +1720,13 @@ func (l *List) findStaticValue(readTs uint64) *pb.PostingList { } }, readTs) if mutation != nil { + //fmt.Println("iterate", mutation) return mutation } // If we reach here, that means that there was no entry in mutation map which is less than readTs. That // means we need to return l.plist + //fmt.Println("got nothing", l.plist, l.plist != nil) return l.plist } @@ -1856,6 +1888,7 @@ func (l *List) findPosting(readTs uint64, uid uint64) (found bool, pos *pb.Posti // Iterate starts iterating after the given argument, so we pass UID - 1 // TODO Find what happens when uid = math.MaxUint64 searchFurther, pos := l.mutationMap.findPosting(readTs, uid) + //fmt.Println("FIND POSTING", readTs, "key", l.key, "mutationMap:", l.mutationMap, "plist:", l.plist, uid, searchFurther, pos) if pos != nil { return true, pos, nil } diff --git a/posting/list_test.go b/posting/list_test.go index 2167095d088..0649f7073a2 100644 --- a/posting/list_test.go +++ b/posting/list_test.go @@ -134,6 +134,7 @@ func TestGetSinglePosting(t *testing.T) { res, err := l.StaticValue(1) require.NoError(t, err) + //fmt.Println(res, res == nil) require.Equal(t, res == nil, true) l.plist = create_pl(1, 1) @@ -225,6 +226,7 @@ func TestAddMutation(t *testing.T) { func getFirst(t *testing.T, l *List, readTs uint64) (res pb.Posting) { require.NoError(t, l.Iterate(readTs, 0, func(p *pb.Posting) error { + //fmt.Println("INSIDE ITERATE", p) res = *p return ErrStopIteration })) @@ -233,6 +235,7 @@ func getFirst(t *testing.T, l *List, readTs uint64) (res pb.Posting) { func checkValue(t *testing.T, ol *List, val string, readTs uint64) { p := getFirst(t, ol, readTs) + //fmt.Println("HERE", val, string(p.Value), p, ol, p.Uid) require.Equal(t, uint64(math.MaxUint64), p.Uid) // Cast to prevent overflow. require.EqualValues(t, val, p.Value) } @@ -522,7 +525,7 @@ func TestReadSingleValue(t *testing.T) { require.NoError(t, ol.commitMutation(i, i+1)) kData := ol.getMutation(i + 1) writer := NewTxnWriter(pstore) - if err := writer.SetAt(key, kData, BitDeltaPosting, i); err != nil { + if err := writer.SetAt(key, kData, BitDeltaPosting, i+1); err != nil { require.NoError(t, err) } writer.Flush() @@ -533,16 +536,17 @@ func TestReadSingleValue(t *testing.T) { require.NoError(t, err) require.NoError(t, writePostingListToDisk(kvs)) // Delete item from global cache before reading, as we are not updating the cache in the test - globalCache.Del(z.MemHash(key)) + memoryLayer.Del(z.MemHash(key)) ol, err = getNew(key, ps, math.MaxUint64) require.NoError(t, err) } - j := uint64(2) + j := uint64(3) if j < ol.minTs { j = ol.minTs } for ; j < i+6; j++ { + ResetCache() tx := NewTxn(j) k, err := tx.cache.GetSinglePosting(key) require.NoError(t, err) diff --git a/posting/lists.go b/posting/lists.go index 7a760135395..d1a398036d2 100644 --- a/posting/lists.go +++ b/posting/lists.go @@ -337,6 +337,7 @@ func (lc *LocalCache) GetSinglePosting(key []byte) (*pb.PostingList, error) { pl := &pb.PostingList{} if delta, ok := lc.deltas[string(key)]; ok && len(delta) > 0 { + //fmt.Println("GETTING FROM DELTAS") err := pl.Unmarshal(delta) lc.RUnlock() return pl, err @@ -357,6 +358,7 @@ func (lc *LocalCache) GetSinglePosting(key []byte) (*pb.PostingList, error) { // If both pl and err are empty, that means that there was no data in local cache, hence we should // read the data from badger. if pl != nil || err != nil { + //fmt.Println("GETTING POSTING1", lc.startTs, pl) return pl, err } @@ -373,6 +375,7 @@ func (lc *LocalCache) GetSinglePosting(key []byte) (*pb.PostingList, error) { return pl.Unmarshal(val) }) + //fmt.Println("GETTING POSTING FROM BADGER", lc.startTs, pl) return pl, err } diff --git a/posting/mvcc.go b/posting/mvcc.go index c5e297520e9..f8d252430a4 100644 --- a/posting/mvcc.go +++ b/posting/mvcc.go @@ -20,6 +20,7 @@ import ( "bytes" "encoding/hex" "fmt" + "math" "runtime" "strconv" "sync" @@ -64,6 +65,7 @@ type CachePL struct { count int list *List lastUpdate uint64 + lastRead uint64 } var ( @@ -185,7 +187,7 @@ func (ir *incrRollupi) Process(closer *z.Closer, getNewTs func(bool) uint64) { defer cleanupTick.Stop() forceRollupTick := time.NewTicker(500 * time.Millisecond) defer forceRollupTick.Stop() - deleteCacheTick := time.NewTicker(10 * time.Second) + deleteCacheTick := time.NewTicker(5 * time.Second) defer deleteCacheTick.Stop() doRollup := func(batch *[][]byte, priority int) { @@ -315,6 +317,7 @@ func (txn *Txn) CommitToDisk(writer *TxnWriter, commitTs uint64) error { for ; idx < len(keys); idx++ { key := keys[idx] data := cache.deltas[key] + //fmt.Println("--------------------------------------------------------HUI") if len(data) == 0 { continue } @@ -357,13 +360,26 @@ func RemoveCacheFor(key []byte) { } } +type setItems struct { + keyHash uint64 + list *List + readTs uint64 +} + type MemoryLayer struct { shards []*lockedMap + setBuf chan *setItems + + numCacheRead int + numCacheReadFails int + numDisksRead int + numCacheSave int } func initMemoryLayer() *MemoryLayer { sm := &MemoryLayer{ shards: make([]*lockedMap, numShards), + setBuf: make(chan *setItems, 32*1024), } for i := range sm.shards { sm.shards[i] = newLockedMap() @@ -427,23 +443,6 @@ func (sm *MemoryLayer) Clear() { } } -func (sm *MemoryLayer) deleteOldItems(ts uint64) { - fmt.Println("Deleting old items") - defer func() { - fmt.Println("Done deleting old items") - }() - for i := 0; i < numShards; i++ { - sm.shards[i].Lock() - defer sm.shards[i].Unlock() - - for keyHash, pl := range sm.shards[i].data { - if pl.lastUpdate < ts-100 { - delete(sm.shards[i].data, keyHash) - } - } - } -} - type lockedMap struct { sync.RWMutex data map[uint64]*CachePL @@ -510,13 +509,11 @@ func NewCachePL() *CachePL { func checkForRollup(key []byte, l *List) { deltaCount := l.mutationMap.len() - if deltaCount > 0 { - // If deltaCount is high, send it to high priority channel instead. - if deltaCount > 500 { - IncrRollup.addKeyToBatch(key, 0) - } else { - IncrRollup.addKeyToBatch(key, 1) - } + // If deltaCount is high, send it to high priority channel instead. + if deltaCount > 500 { + IncrRollup.addKeyToBatch(key, 0) + } else { + IncrRollup.addKeyToBatch(key, 1) } } @@ -546,8 +543,8 @@ func (ml *MemoryLayer) updateItemInCache(key string, pk x.ParsedKey, delta []byt x.Check(p.Unmarshal(delta)) val.list.setMutationAfterCommit(startTs, commitTs, p, true) checkForRollup([]byte(key), val.list) + //fmt.Println("====Setting cache list", commitTs, pk, p, val.list.mutationMap, val.list.key) } - //fmt.Println("====Setting cache list", commitTs, pk, p, val.list.mutationMap) } // RemoveCachedKeys will delete the cached list by this txn. @@ -591,6 +588,7 @@ func ReadPostingList(key []byte, it *badger.Iterator) (*List, error) { // lists ended up being rolled-up multiple times. This issue was caught by the // uid-set Jepsen test. pk, err := x.Parse(key) + //fmt.Println("READING KEY", key, pk) if err != nil { return nil, errors.Wrapf(err, "while reading posting list with key [%v]", key) } @@ -607,6 +605,7 @@ func ReadPostingList(key []byte, it *badger.Iterator) (*List, error) { l := new(List) l.key = key l.plist = new(pb.PostingList) + l.minTs = 0 // We use the following block of code to trigger incremental rollup on this key. deltaCount := 0 @@ -635,14 +634,13 @@ func ReadPostingList(key []byte, it *badger.Iterator) (*List, error) { switch item.UserMeta() { case BitEmptyPosting: - l.minTs = item.Version() return l, nil case BitCompletePosting: if err := unmarshalOrCopy(l.plist, item); err != nil { return nil, err } - l.minTs = item.Version() + l.minTs = item.Version() // No need to do Next here. The outer loop can take care of skipping // more versions of the same key. return l, nil @@ -679,9 +677,6 @@ func ReadPostingList(key []byte, it *badger.Iterator) (*List, error) { } it.Next() } - if l.mutationMap != nil { - l.mutationMap = newMutableMap() - } return l, nil } @@ -705,78 +700,104 @@ func (c *CachePL) Set(l *List, readTs uint64) { } func ShouldGoInCache(pk x.ParsedKey) bool { - return !pk.IsData() + return true } func PostingListCacheEnabled() bool { return lCache != nil } -func (ml *MemoryLayer) saveInCache(key []byte, keyHash, readTs uint64, l *List) { - ml.LockKey(keyHash) - defer ml.UnlockKey(keyHash) - l.RLock() - cacheItem, ok := ml.get(keyHash) +func (ml *MemoryLayer) Process(i *setItems) { + ml.LockKey(i.keyHash) + i.list.RLock() + cacheItem, ok := ml.get(i.keyHash) if !ok { cacheItemNew := NewCachePL() cacheItemNew.count = 1 - cacheItemNew.list = copyList(l) - cacheItemNew.lastUpdate = l.maxTs - ml.set(keyHash, cacheItemNew) + cacheItemNew.list = copyList(i.list) + cacheItemNew.lastUpdate = i.list.maxTs + ml.set(i.keyHash, cacheItemNew) } else { // Only set l to the cache if readTs >= latestTs, which implies that l is // the latest version of the PL. We also check that we're reading a version // from Badger, which is higher than the write registered by the cache. //fmt.Println("====Setting cache", readTs, pk, l.mutationMap) - cacheItem.Set(copyList(l), readTs) + cacheItem.Set(copyList(i.list), i.readTs) + } + ml.numCacheSave += 1 + //allV, _ := i.list.AllValues(i.readTs) + //uids, _ := i.list.Uids(ListOptions{ReadTs: i.readTs}) + //fmt.Println("====Setting into cache", i.readTs, i.list.key, i.list.mutationMap, allV, uids) + i.list.RUnlock() + + idx := int(i.keyHash % uint64(numShards)) + if len(ml.shards[idx].data) > 500 { + for keyHash, pl := range ml.shards[idx].data { + if pl.lastRead < i.readTs-100 { + delete(ml.shards[idx].data, keyHash) + } + } } - l.RUnlock() - //allV, _ := l.AllValues(readTs) - //uids, _ := l.Uids(ListOptions{ReadTs: readTs}) - //fmt.Println("====Getting from disk", readTs, pk, l.mutationMap, allV, uids) + + ml.UnlockKey(i.keyHash) +} + +func (sm *MemoryLayer) deleteOldItems(ts uint64) { + fmt.Println("Deleting old items", sm.numCacheRead, sm.numDisksRead, sm.numCacheSave, sm.numCacheReadFails, float64(sm.numCacheRead)/float64(sm.numDisksRead)) + lb := 0 + la := 0 + t1 := time.Now() + defer func() { + fmt.Println("Done deleting old items", lb, la, time.Since(t1)) + }() + for i := 0; i < numShards; i++ { + sm.shards[i].Lock() + lb += len(sm.shards[i].data) + for keyHash, pl := range sm.shards[i].data { + if pl.lastRead < ts-500 { + delete(sm.shards[i].data, keyHash) + } + } + la += len(sm.shards[i].data) + sm.shards[i].Unlock() + } +} + +func (ml *MemoryLayer) saveInCache(keyHash, readTs uint64, l *List) { + ml.Process(&setItems{ + keyHash: keyHash, + readTs: readTs, + list: l, + }) } func (ml *MemoryLayer) readFromCache(key []byte, keyHash, readTs uint64) *List { - ml.LockKey(keyHash) + ml.RLockKey(keyHash) cacheItem, ok := ml.get(keyHash) - if !ok { - cacheItem = NewCachePL() - //fmt.Println("====Setting empty cache", readTs, pk) - ml.set(keyHash, cacheItem) - } - cacheItem.count += 1 if ok { + cacheItem.count += 1 + cacheItem.lastRead = x.Max(cacheItem.lastRead, readTs) if cacheItem.list != nil && cacheItem.list.minTs <= readTs { cacheItem.list.RLock() lCopy := copyList(cacheItem.list) cacheItem.list.RUnlock() - ml.UnlockKey(keyHash) + ml.RUnlockKey(keyHash) checkForRollup(key, lCopy) //allV, _ := lCopy.AllValues(readTs) //uids, _ := lCopy.Uids(ListOptions{ReadTs: readTs}) - //fmt.Println("====Getting cache", readTs, pk, lCopy.mutationMap, allV, uids) + //fmt.Println("====Getting cache", readTs, lCopy.key, lCopy.mutationMap, allV, uids) return lCopy } } - ml.UnlockKey(keyHash) + ml.RUnlockKey(keyHash) return nil } -func (ml *MemoryLayer) ReadData(key []byte, pstore *badger.DB, readTs uint64) (*List, error) { - pk, _ := x.Parse(key) - var keyHash uint64 - - if ShouldGoInCache(pk) { - keyHash = z.MemHash(key) - l := ml.readFromCache(key, keyHash, readTs) - if l != nil { - return l, nil - } - } - +func (ml *MemoryLayer) readFromDisk(key []byte, pstore *badger.DB, readTs uint64) (*List, error) { + ml.numDisksRead += 1 txn := pstore.NewTransactionAt(readTs, false) defer txn.Discard() @@ -789,18 +810,49 @@ func (ml *MemoryLayer) ReadData(key []byte, pstore *badger.DB, readTs uint64) (* defer itr.Close() itr.Seek(key) l, err := ReadPostingList(key, itr) + //fmt.Println("=============GETTING DISK", key, l.mutationMap, l.plist) if err != nil { return l, err } + return l, nil +} + +func (ml *MemoryLayer) ReadData(key []byte, pstore *badger.DB, readTs uint64) (*List, error) { + pk, _ := x.Parse(key) + var keyHash uint64 - if ShouldGoInCache(pk) { - ml.saveInCache(key, keyHash, readTs, l) + gic := ShouldGoInCache(pk) + if gic { + keyHash = z.MemHash(key) + l := ml.readFromCache(key, keyHash, readTs) + if l != nil { + //fmt.Println(pk, pk.IsData()) + ml.numCacheRead += 1 + return l, nil + } else { + ml.numCacheReadFails += 1 + } + l, err := ml.readFromDisk(key, pstore, math.MaxUint64) + //fmt.Println("READING FROM DISK", l.minTs, readTs) + if err != nil { + return nil, err + } + ml.saveInCache(keyHash, readTs, l) + if l.minTs == 0 || readTs >= l.minTs { + return l, nil + } + } + + l, err := ml.readFromDisk(key, pstore, readTs) + if err != nil { + return nil, err } return l, nil } func getNew(key []byte, pstore *badger.DB, readTs uint64) (*List, error) { + //fmt.Println("Get new", key) if PostingListCacheEnabled() { l, ok := lCache.Get(key) if ok && l != nil { diff --git a/posting/mvcc_test.go b/posting/mvcc_test.go index 0d0b61e47f2..02b35579a1f 100644 --- a/posting/mvcc_test.go +++ b/posting/mvcc_test.go @@ -19,6 +19,7 @@ package posting import ( "context" "math" + "math/rand" "testing" "time" @@ -89,12 +90,66 @@ func TestCacheAfterDeltaUpdateRecieved(t *testing.T) { // Read key at timestamp 10. Make sure cache is not updated by this, as there is a later read. l, err := GetNoStore(key, 10) require.NoError(t, err) - require.Equal(t, l.mutationMap.len(), 0) + require.Equal(t, l.mutationMap.listLen(10), 0) // Read at 20 should show the value l1, err := GetNoStore(key, 20) require.NoError(t, err) - require.Equal(t, l1.mutationMap.len(), 1) + require.Equal(t, l1.mutationMap.listLen(20), 1) +} + +func BenchmarkTestCache(b *testing.B) { + //lCache, _ = ristretto.NewCache[[]byte, *List](&ristretto.Config[[]byte, *List]{ + // // Use 5% of cache memory for storing counters. + // NumCounters: int64(1000 * (1 << 20) * 0.05 * 2), + // MaxCost: int64(1000 * (1 << 20) * 0.95), + // BufferItems: 64, + // Metrics: true, + // Cost: func(val *List) int64 { + // return 0 + // }, + //}) + + attr := x.GalaxyAttr("cache") + keys := make([][]byte, 0) + N := 10000 + txn := Oracle().RegisterStartTs(1) + + for i := 1; i < N; i++ { + key := x.DataKey(attr, uint64(i)) + keys = append(keys, key) + edge := &pb.DirectedEdge{ + ValueId: 2, + Attr: attr, + Entity: 1, + Op: pb.DirectedEdge_SET, + } + l, _ := GetNoStore(key, 1) + // No index entries added here as we do not call AddMutationWithIndex. + txn.cache.SetIfAbsent(string(l.key), l) + err := l.addMutation(context.Background(), txn, edge) + if err != nil { + panic(err) + } + } + txn.Update() + writer := NewTxnWriter(pstore) + err := txn.CommitToDisk(writer, 2) + if err != nil { + panic(err) + } + writer.Flush() + + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + key := keys[rand.Intn(N-1)] + _, err = getNew(key, pstore, math.MaxUint64) + if err != nil { + panic(err) + } + } + }) + } func TestRollupTimestamp(t *testing.T) { @@ -154,7 +209,7 @@ func TestPostingListRead(t *testing.T) { require.NoError(t, writer.SetAt(key, []byte{}, BitEmptyPosting, 6)) require.NoError(t, writer.Flush()) // Delete the key from cache as we have just updated it - globalCache.Del(z.MemHash(key)) + memoryLayer.Del(z.MemHash(key)) assertLength(7, 0) addEdgeToUID(t, attr, 1, 4, 7, 8) @@ -167,7 +222,7 @@ func TestPostingListRead(t *testing.T) { writer = NewTxnWriter(pstore) require.NoError(t, writer.SetAt(key, data, BitCompletePosting, 10)) require.NoError(t, writer.Flush()) - globalCache.Del(z.MemHash(key)) + memoryLayer.Del(z.MemHash(key)) assertLength(10, 0) addEdgeToUID(t, attr, 1, 5, 11, 12) diff --git a/x/keys.go b/x/keys.go index b55f0830e45..f297f535f68 100644 --- a/x/keys.go +++ b/x/keys.go @@ -307,7 +307,7 @@ func (p ParsedKey) String() string { } else if p.IsCountOrCountRev() { return fmt.Sprintf("UID: %v, Attr: %v, IsCount/Ref: true, Count: %v", p.Uid, p.Attr, p.Count) } else { - return fmt.Sprintf("UID: %v, Attr: %v, Data key", p.Uid, p.Attr) + return fmt.Sprintf("UID: %v, Attr: %v, Data key, prefix; %v, byte: %v", p.Uid, p.Attr, p.bytePrefix, p.ByteType) } }