From b566e13674cb59a06961827b655164e4cacabac9 Mon Sep 17 00:00:00 2001 From: Harshil Goel Date: Tue, 15 Oct 2024 19:24:25 +0530 Subject: [PATCH] refactor --- posting/list.go | 3 +- posting/lists.go | 5 +- posting/mvcc.go | 317 +++++++++++++++++++++++++---------------------- 3 files changed, 171 insertions(+), 154 deletions(-) diff --git a/posting/list.go b/posting/list.go index 1fc1dda3a87..5e51aad022d 100644 --- a/posting/list.go +++ b/posting/list.go @@ -872,7 +872,6 @@ func (l *List) setMutation(startTs uint64, data []byte) { l.mutationMap = newMutableMap() } l.mutationMap.set(startTs, pl) - if pl.CommitTs != 0 { l.maxTs = x.Max(l.maxTs, pl.CommitTs) } @@ -903,6 +902,7 @@ func (l *List) Iterate(readTs uint64, afterUid uint64, f func(obj *pb.Posting) e // If greater than zero, this timestamp must thus be greater than l.minTs. func (l *List) pickPostings(readTs uint64) (uint64, []*pb.Posting) { // This function would return zero ts for entries above readTs. + // either way, effective ts is returning ts. effective := func(start, commit uint64) uint64 { if commit > 0 && commit <= readTs { // Has been committed and below the readTs. @@ -1099,6 +1099,7 @@ func (l *List) getPostingAndLength(readTs, afterUid, uid uint64) (int, bool, *pb var count int var found bool var post *pb.Posting + err := l.iterate(readTs, afterUid, func(p *pb.Posting) error { if p.Uid == uid { post = p diff --git a/posting/lists.go b/posting/lists.go index d089fd0a553..7a760135395 100644 --- a/posting/lists.go +++ b/posting/lists.go @@ -312,8 +312,9 @@ func (lc *LocalCache) getInternal(key []byte, readFromDisk bool) (*List, error) } } else { pl = &List{ - key: key, - plist: new(pb.PostingList), + key: key, + plist: new(pb.PostingList), + mutationMap: newMutableMap(), } } diff --git a/posting/mvcc.go b/posting/mvcc.go index 76b9986db34..c5e297520e9 100644 --- a/posting/mvcc.go +++ b/posting/mvcc.go @@ -20,12 +20,14 @@ import ( "bytes" "encoding/hex" "fmt" + "runtime" "strconv" "sync" "sync/atomic" "time" "github.com/golang/glog" + "github.com/golang/protobuf/proto" "github.com/pkg/errors" "github.com/dgraph-io/badger/v4" @@ -64,12 +66,6 @@ type CachePL struct { lastUpdate uint64 } -type GlobalCache struct { - sync.RWMutex - - items map[string]*CachePL -} - var ( // ErrTsTooOld is returned when a transaction is too old to be applied. ErrTsTooOld = errors.Errorf("Transaction is too old") @@ -84,11 +80,12 @@ var ( priorityKeys: make([]*pooledKeys, 2), } - globalCache = newShardedMap() + memoryLayer = initMemoryLayer() numShards = 256 ) func init() { + runtime.SetCPUProfileRate(200) x.AssertTrue(len(IncrRollup.priorityKeys) == 2) for i := range IncrRollup.priorityKeys { IncrRollup.priorityKeys[i] = &pooledKeys{ @@ -136,7 +133,7 @@ func (ir *incrRollupi) rollUpKey(writer *TxnWriter, key []byte) error { RemoveCacheFor(key) //pk, _ := x.Parse(key) //fmt.Println("====Setting cache delete rollup", ts, pk) - globalCache.Del(z.MemHash(key)) + memoryLayer.Del(z.MemHash(key)) // TODO Update cache with rolled up results // If we do a rollup, we typically won't need to update the key in cache. // The only caveat is that the key written by rollup would be written at +1 @@ -221,7 +218,7 @@ func (ir *incrRollupi) Process(closer *z.Closer, getNewTs func(bool) uint64) { } } case <-deleteCacheTick.C: - globalCache.deleteOldItems(ir.getNewTs(false)) + memoryLayer.deleteOldItems(ir.getNewTs(false)) case <-forceRollupTick.C: batch := ir.priorityKeys[0].keysPool.Get().(*[][]byte) if len(*batch) > 0 { @@ -349,7 +346,7 @@ func ResetCache() { if lCache != nil { lCache.Clear() } - globalCache.Clear() + memoryLayer.Clear() } // RemoveCacheFor will delete the list corresponding to the given key. @@ -360,12 +357,12 @@ func RemoveCacheFor(key []byte) { } } -type shardedMap struct { +type MemoryLayer struct { shards []*lockedMap } -func newShardedMap() *shardedMap { - sm := &shardedMap{ +func initMemoryLayer() *MemoryLayer { + sm := &MemoryLayer{ shards: make([]*lockedMap, numShards), } for i := range sm.shards { @@ -374,15 +371,11 @@ func newShardedMap() *shardedMap { return sm } -func (sm *shardedMap) Get(key uint64) (*CachePL, bool) { - return sm.shards[key%uint64(numShards)].Get(key) -} - -func (sm *shardedMap) get(key uint64) (*CachePL, bool) { +func (sm *MemoryLayer) get(key uint64) (*CachePL, bool) { return sm.shards[key%uint64(numShards)].get(key) } -func (sm *shardedMap) set(key uint64, i *CachePL) { +func (sm *MemoryLayer) set(key uint64, i *CachePL) { if i == nil { // If item is nil make this Set a no-op. return @@ -391,7 +384,15 @@ func (sm *shardedMap) set(key uint64, i *CachePL) { sm.shards[key%uint64(numShards)].set(key, i) } -func (sm *shardedMap) Set(key uint64, i *CachePL) { +func (sm *MemoryLayer) del(key uint64) { + sm.shards[key%uint64(numShards)].del(key) +} + +func (sm *MemoryLayer) Get(key uint64) (*CachePL, bool) { + return sm.shards[key%uint64(numShards)].Get(key) +} + +func (sm *MemoryLayer) Set(key uint64, i *CachePL) { if i == nil { // If item is nil make this Set a no-op. return @@ -400,37 +401,33 @@ func (sm *shardedMap) Set(key uint64, i *CachePL) { sm.shards[key%uint64(numShards)].Set(key, i) } -func (sm *shardedMap) del(key uint64) { - sm.shards[key%uint64(numShards)].del(key) -} - -func (sm *shardedMap) Del(key uint64) { +func (sm *MemoryLayer) Del(key uint64) { sm.shards[key%uint64(numShards)].Del(key) } -func (sm *shardedMap) UnlockKey(key uint64) { +func (sm *MemoryLayer) UnlockKey(key uint64) { sm.shards[key%uint64(numShards)].Unlock() } -func (sm *shardedMap) LockKey(key uint64) { +func (sm *MemoryLayer) LockKey(key uint64) { sm.shards[key%uint64(numShards)].Lock() } -func (sm *shardedMap) RLockKey(key uint64) { +func (sm *MemoryLayer) RLockKey(key uint64) { sm.shards[key%uint64(numShards)].RLock() } -func (sm *shardedMap) RUnlockKey(key uint64) { +func (sm *MemoryLayer) RUnlockKey(key uint64) { sm.shards[key%uint64(numShards)].RUnlock() } -func (sm *shardedMap) Clear() { +func (sm *MemoryLayer) Clear() { for i := 0; i < numShards; i++ { sm.shards[i].Clear() } } -func (sm *shardedMap) deleteOldItems(ts uint64) { +func (sm *MemoryLayer) deleteOldItems(ts uint64) { fmt.Println("Deleting old items") defer func() { fmt.Println("Done deleting old items") @@ -511,6 +508,48 @@ func NewCachePL() *CachePL { } } +func checkForRollup(key []byte, l *List) { + deltaCount := l.mutationMap.len() + if deltaCount > 0 { + // If deltaCount is high, send it to high priority channel instead. + if deltaCount > 500 { + IncrRollup.addKeyToBatch(key, 0) + } else { + IncrRollup.addKeyToBatch(key, 1) + } + } +} + +func (ml *MemoryLayer) updateItemInCache(key string, pk x.ParsedKey, delta []byte, startTs, commitTs uint64) { + if commitTs == 0 { + return + } + + keyHash := z.MemHash([]byte(key)) + // TODO under the same lock + ml.LockKey(keyHash) + defer ml.UnlockKey(keyHash) + + val, ok := ml.get(keyHash) + if !ok { + val = NewCachePL() + val.lastUpdate = commitTs + ml.set(keyHash, val) + return + } + + val.lastUpdate = commitTs + val.count -= 1 + + if val.list != nil { + p := new(pb.PostingList) + x.Check(p.Unmarshal(delta)) + val.list.setMutationAfterCommit(startTs, commitTs, p, true) + checkForRollup([]byte(key), val.list) + } + //fmt.Println("====Setting cache list", commitTs, pk, p, val.list.mutationMap) +} + // RemoveCachedKeys will delete the cached list by this txn. func (txn *Txn) UpdateCachedKeys(commitTs uint64) { if txn == nil || txn.cache == nil { @@ -519,57 +558,12 @@ func (txn *Txn) UpdateCachedKeys(commitTs uint64) { for key, delta := range txn.cache.deltas { RemoveCacheFor([]byte(key)) + pk, _ := x.Parse([]byte(key)) if !ShouldGoInCache(pk) { continue } - keyHash := z.MemHash([]byte(key)) - // TODO under the same lock - globalCache.LockKey(keyHash) - val, ok := globalCache.get(keyHash) - if !ok { - val = NewCachePL() - val.lastUpdate = commitTs - globalCache.set(keyHash, val) - } - if commitTs != 0 { - // TODO Delete this if the values are too old in an async thread - val.lastUpdate = commitTs - } - - if commitTs != 0 { - p := new(pb.PostingList) - x.Check(p.Unmarshal(delta)) - //fmt.Println("====Committing ", txn.StartTs, commitTs, pk, p) - } - - if !ok { - globalCache.UnlockKey(keyHash) - continue - } - - val.count -= 1 - - if commitTs != 0 && val.list != nil { - p := new(pb.PostingList) - x.Check(p.Unmarshal(delta)) - val.list.setMutationAfterCommit(txn.StartTs, commitTs, p, true) - //fmt.Println("====Setting cache list", commitTs, pk, p, val.list.mutationMap) - } - - if val.list != nil { - deltaCount := len(val.list.mutationMap) - if deltaCount > 0 { - // If deltaCount is high, send it to high priority channel instead. - if deltaCount > 500 { - IncrRollup.addKeyToBatch([]byte(key), 0) - } else { - IncrRollup.addKeyToBatch([]byte(key), 1) - } - } - } - - globalCache.UnlockKey(keyHash) + memoryLayer.updateItemInCache(key, pk, delta, txn.StartTs, commitTs) } } @@ -654,6 +648,9 @@ func ReadPostingList(key []byte, it *badger.Iterator) (*List, error) { return l, nil case BitDeltaPosting: err := item.Value(func(val []byte) error { + if l.mutationMap == nil { + l.mutationMap = newMutableMap() + } pl := &pb.PostingList{} if err := pl.Unmarshal(val); err != nil { return err @@ -682,6 +679,9 @@ func ReadPostingList(key []byte, it *badger.Iterator) (*List, error) { } it.Next() } + if l.mutationMap != nil { + l.mutationMap = newMutableMap() + } return l, nil } @@ -712,68 +712,69 @@ func PostingListCacheEnabled() bool { return lCache != nil } -func getNew(key []byte, pstore *badger.DB, readTs uint64) (*List, error) { - if PostingListCacheEnabled() { - l, ok := lCache.Get(key) - if ok && l != nil { - // No need to clone the immutable layer or the key since mutations will not modify it. - lCopy := &List{ - minTs: l.minTs, - maxTs: l.maxTs, - key: key, - plist: l.plist, - } - l.RLock() - lCopy.mutationMap = l.mutationMap.clone() - l.RUnlock() - return lCopy, nil - } +func (ml *MemoryLayer) saveInCache(key []byte, keyHash, readTs uint64, l *List) { + ml.LockKey(keyHash) + defer ml.UnlockKey(keyHash) + l.RLock() + cacheItem, ok := ml.get(keyHash) + if !ok { + cacheItemNew := NewCachePL() + cacheItemNew.count = 1 + cacheItemNew.list = copyList(l) + cacheItemNew.lastUpdate = l.maxTs + ml.set(keyHash, cacheItemNew) + } else { + // Only set l to the cache if readTs >= latestTs, which implies that l is + // the latest version of the PL. We also check that we're reading a version + // from Badger, which is higher than the write registered by the cache. + + //fmt.Println("====Setting cache", readTs, pk, l.mutationMap) + cacheItem.Set(copyList(l), readTs) } + l.RUnlock() + //allV, _ := l.AllValues(readTs) + //uids, _ := l.Uids(ListOptions{ReadTs: readTs}) + //fmt.Println("====Getting from disk", readTs, pk, l.mutationMap, allV, uids) +} - if pstore.IsClosed() { - return nil, badger.ErrDBClosed +func (ml *MemoryLayer) readFromCache(key []byte, keyHash, readTs uint64) *List { + ml.LockKey(keyHash) + + cacheItem, ok := ml.get(keyHash) + if !ok { + cacheItem = NewCachePL() + //fmt.Println("====Setting empty cache", readTs, pk) + ml.set(keyHash, cacheItem) } + cacheItem.count += 1 + + if ok { + if cacheItem.list != nil && cacheItem.list.minTs <= readTs { + cacheItem.list.RLock() + lCopy := copyList(cacheItem.list) + cacheItem.list.RUnlock() + ml.UnlockKey(keyHash) + checkForRollup(key, lCopy) + //allV, _ := lCopy.AllValues(readTs) + //uids, _ := lCopy.Uids(ListOptions{ReadTs: readTs}) + //fmt.Println("====Getting cache", readTs, pk, lCopy.mutationMap, allV, uids) + return lCopy + } + } + ml.UnlockKey(keyHash) + return nil +} +func (ml *MemoryLayer) ReadData(key []byte, pstore *badger.DB, readTs uint64) (*List, error) { pk, _ := x.Parse(key) var keyHash uint64 if ShouldGoInCache(pk) { keyHash = z.MemHash(key) - globalCache.LockKey(keyHash) - cacheItem, ok := globalCache.get(keyHash) - if !ok { - cacheItem = NewCachePL() - // TODO see if this is reuqired - //fmt.Println("====Setting empty cache", readTs, pk) - globalCache.set(keyHash, cacheItem) - } - cacheItem.count += 1 - - // We use badger subscription to invalidate the cache. For every write we make the value - // corresponding to the key in the cache to nil. So, if we get some non-nil value from the cache - // then it means that no writes have happened after the last set of this key in the cache. - if ok { - if cacheItem.list != nil && cacheItem.list.minTs <= readTs { - cacheItem.list.RLock() - lCopy := copyList(cacheItem.list) - cacheItem.list.RUnlock() - globalCache.UnlockKey(keyHash) - //allV, _ := lCopy.AllValues(readTs) - //uids, _ := lCopy.Uids(ListOptions{ReadTs: readTs}) - //fmt.Println("====Getting cache", readTs, pk, lCopy.mutationMap, allV, uids) - deltaCount := len(lCopy.mutationMap) - if deltaCount > 0 { - // If deltaCount is high, send it to high priority channel instead. - if deltaCount > 500 { - IncrRollup.addKeyToBatch(key, 0) - } else { - IncrRollup.addKeyToBatch(key, 1) - } - } - return lCopy, nil - } + l := ml.readFromCache(key, keyHash, readTs) + if l != nil { + return l, nil } - globalCache.UnlockKey(keyHash) } txn := pstore.NewTransactionAt(readTs, false) @@ -792,29 +793,43 @@ func getNew(key []byte, pstore *badger.DB, readTs uint64) (*List, error) { return l, err } - // Only set l to the cache if readTs >= latestTs, which implies that l is - // the latest version of the PL. We also check that we're reading a version - // from Badger, which is higher than the write registered by the cache. if ShouldGoInCache(pk) { - globalCache.LockKey(keyHash) - l.RLock() - // TODO fix Get and Set to be under one lock - cacheItem, ok := globalCache.get(keyHash) - if !ok { - cacheItemNew := NewCachePL() - cacheItemNew.count = 1 - cacheItemNew.list = copyList(l) - cacheItemNew.lastUpdate = l.maxTs - globalCache.set(keyHash, cacheItemNew) - } else { - //fmt.Println("====Setting cache", readTs, pk, l.mutationMap) - cacheItem.Set(copyList(l), readTs) + ml.saveInCache(key, keyHash, readTs, l) + } + + return l, nil +} + +func getNew(key []byte, pstore *badger.DB, readTs uint64) (*List, error) { + if PostingListCacheEnabled() { + l, ok := lCache.Get(key) + if ok && l != nil { + // No need to clone the immutable layer or the key since mutations will not modify it. + lCopy := &List{ + minTs: l.minTs, + maxTs: l.maxTs, + key: key, + plist: l.plist, + } + l.RLock() + if l.mutationMap != nil { + lCopy.mutationMap = newMutableMap() + for ts, pl := range l.mutationMap.oldList { + lCopy.mutationMap.oldList[ts] = proto.Clone(pl).(*pb.PostingList) + } + } + l.RUnlock() + return lCopy, nil } - l.RUnlock() - //allV, _ := l.AllValues(readTs) - //uids, _ := l.Uids(ListOptions{ReadTs: readTs}) - //fmt.Println("====Getting from disk", readTs, pk, l.mutationMap, allV, uids) - globalCache.UnlockKey(keyHash) + } + + if pstore.IsClosed() { + return nil, badger.ErrDBClosed + } + + l, err := memoryLayer.ReadData(key, pstore, readTs) + if err != nil { + return l, err } if PostingListCacheEnabled() {