From c051c2f5d0b7d3c2c8b2d7f195f22d4f12e0c6c4 Mon Sep 17 00:00:00 2001 From: Ahsan Barkati Date: Wed, 25 Aug 2021 12:13:16 +0530 Subject: [PATCH] opt(cache): Use Ristretto to store posting lists (#7995) - Use some nifty techniques to make Ristretto caching work with MVCC posting lists. - Use posting list cache by default. - Set max pending to 64 by default. This PR significantly improves query performance. This coupled with Roaring Bitmaps has shown to improve query latency by 20x for 50% of user queries, and 100x for 25% of them. Co-authored-by: Manish R Jain --- go.mod | 4 +- go.sum | 7 +- posting/index.go | 8 ++ posting/list.go | 2 + posting/list_test.go | 43 ++++++--- posting/lists.go | 40 ++++++-- posting/mvcc.go | 211 ++++++++++++++++++++--------------------- posting/oracle.go | 14 ++- worker/draft.go | 9 +- worker/server_state.go | 4 +- worker/snapshot.go | 2 + 11 files changed, 204 insertions(+), 140 deletions(-) diff --git a/go.mod b/go.mod index f2967ab739e..fbd4e99bea3 100644 --- a/go.mod +++ b/go.mod @@ -19,12 +19,12 @@ require ( github.com/Shopify/sarama v1.27.2 github.com/blevesearch/bleve v1.0.13 github.com/codahale/hdrhistogram v0.0.0-20161010025455-3a0bb77429bd - github.com/dgraph-io/badger/v3 v3.0.0-20210707084205-c40b2e9af902 + github.com/dgraph-io/badger/v3 v3.0.0-20210825061050-c2b23c471f5e github.com/dgraph-io/dgo/v210 v210.0.0-20210407152819-261d1c2a6987 github.com/dgraph-io/gqlgen v0.13.2 github.com/dgraph-io/gqlparser/v2 v2.2.0 github.com/dgraph-io/graphql-transport-ws v0.0.0-20210511143556-2cef522f1f15 - github.com/dgraph-io/ristretto v0.1.0 + github.com/dgraph-io/ristretto v0.1.1-0.20210824115121-89e99415887a github.com/dgraph-io/simdjson-go v0.3.0 github.com/dgraph-io/sroar v0.0.0-20210816194026-bc614dc5ce67 github.com/dgrijalva/jwt-go v3.2.0+incompatible diff --git a/go.sum b/go.sum index 8e0c17854a4..e8bbd45bdfb 100644 --- a/go.sum +++ b/go.sum @@ -167,8 +167,8 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/dgraph-io/badger v1.6.0 h1:DshxFxZWXUcO0xX476VJC07Xsr6ZCBVRHKZ93Oh7Evo= github.com/dgraph-io/badger v1.6.0/go.mod h1:zwt7syl517jmP8s94KqSxTlM6IMsdhYy6psNgSztDR4= -github.com/dgraph-io/badger/v3 v3.0.0-20210707084205-c40b2e9af902 h1:Hk1GPDKdIAZD4TC8tiqjvwmYsicKI8URu0RIb/uFwDU= -github.com/dgraph-io/badger/v3 v3.0.0-20210707084205-c40b2e9af902/go.mod h1:dULbq6ehJ5K0cGW/1TQ9iSfUk0gbSiToDWmWmTsJ53E= +github.com/dgraph-io/badger/v3 v3.0.0-20210825061050-c2b23c471f5e h1:lugmhvI1tMal0wKW0g5uxIRHUqXpE5y1lgq/vm/UP/8= +github.com/dgraph-io/badger/v3 v3.0.0-20210825061050-c2b23c471f5e/go.mod h1:dULbq6ehJ5K0cGW/1TQ9iSfUk0gbSiToDWmWmTsJ53E= github.com/dgraph-io/dgo/v210 v210.0.0-20210407152819-261d1c2a6987 h1:5aN6H88a2q3HkO8vSZxDlgjEpJf4Fz8rfy+/Wzx2uAc= github.com/dgraph-io/dgo/v210 v210.0.0-20210407152819-261d1c2a6987/go.mod h1:dCzdThGGTPYOAuNtrM6BiXj/86voHn7ZzkPL6noXR3s= github.com/dgraph-io/gqlgen v0.13.2 h1:TNhndk+eHKj5qE7BenKKSYdSIdOGhLqxR1rCiMso9KM= @@ -178,8 +178,9 @@ github.com/dgraph-io/gqlparser/v2 v2.2.0 h1:fKSCW8OxoMogjDwUhO9OrFvrgIA0UZspTDbc github.com/dgraph-io/gqlparser/v2 v2.2.0/go.mod h1:MYS4jppjyx8b9tuUtjV7jU1UFZK6P9fvO8TsIsQtRKU= github.com/dgraph-io/graphql-transport-ws v0.0.0-20210511143556-2cef522f1f15 h1:X2NRsgAtVUAp2nmTPCq+x+wTcRRrj74CEpy7E0Unsl4= github.com/dgraph-io/graphql-transport-ws v0.0.0-20210511143556-2cef522f1f15/go.mod h1:7z3c/5w0sMYYZF5bHsrh8IH4fKwG5O5Y70cPH1ZLLRQ= -github.com/dgraph-io/ristretto v0.1.0 h1:Jv3CGQHp9OjuMBSne1485aDpUkTKEcUqF+jm/LuerPI= github.com/dgraph-io/ristretto v0.1.0/go.mod h1:fux0lOrBhrVCJd3lcTHsIJhq1T2rokOu6v9Vcb3Q9ug= +github.com/dgraph-io/ristretto v0.1.1-0.20210824115121-89e99415887a h1:2+hTlwc5yG4WAUXCoKWT/JJ11g8J1Q70in9abzFW7EQ= +github.com/dgraph-io/ristretto v0.1.1-0.20210824115121-89e99415887a/go.mod h1:fux0lOrBhrVCJd3lcTHsIJhq1T2rokOu6v9Vcb3Q9ug= github.com/dgraph-io/simdjson-go v0.3.0 h1:h71LO7vR4LHMPUhuoGN8bqGm1VNfGOlAG8BI6iDUKw0= github.com/dgraph-io/simdjson-go v0.3.0/go.mod h1:Otpysdjaxj9OGaJusn4pgQV7OFh2bELuHANq0I78uvY= github.com/dgraph-io/sroar v0.0.0-20210816194026-bc614dc5ce67 h1:IVHiU4jfB86QzsbwtSxtP6q/CsB9gOB69i1AFu9TiIk= diff --git a/posting/index.go b/posting/index.go index 3e10552b7e2..23da5a3c9cc 100644 --- a/posting/index.go +++ b/posting/index.go @@ -1215,11 +1215,13 @@ func rebuildListType(ctx context.Context, rb *IndexRebuild) error { // DeleteAll deletes all entries in the posting list. func DeleteAll() error { + ResetCache() return pstore.DropAll() } // DeleteData deletes all data for the namespace but leaves types and schema intact. func DeleteData(ns uint64) error { + ResetCache() prefix := make([]byte, 9) prefix[0] = x.DefaultPrefix binary.BigEndian.PutUint64(prefix[1:], ns) @@ -1230,6 +1232,8 @@ func DeleteData(ns uint64) error { // based on DB options set. func DeletePredicate(ctx context.Context, attr string, ts uint64) error { glog.Infof("Dropping predicate: [%s]", attr) + // TODO: We should only delete cache for certain keys, not all the keys. + ResetCache() prefix := x.PredicatePrefix(attr) if err := pstore.DropPrefix(prefix); err != nil { return err @@ -1241,6 +1245,8 @@ func DeletePredicate(ctx context.Context, attr string, ts uint64) error { // writes. func DeletePredicateBlocking(ctx context.Context, attr string, ts uint64) error { glog.Infof("Dropping predicate: [%s]", attr) + // TODO: We should only delete cache for certain keys, not all the keys. + ResetCache() prefix := x.PredicatePrefix(attr) if err := pstore.DropPrefixBlocking(prefix); err != nil { return err @@ -1250,6 +1256,8 @@ func DeletePredicateBlocking(ctx context.Context, attr string, ts uint64) error // DeleteNamespace bans the namespace and deletes its predicates/types from the schema. func DeleteNamespace(ns uint64) error { + // TODO: We should only delete cache for certain keys, not all the keys. + ResetCache() schema.State().DeletePredsForNs(ns) return pstore.BanNamespace(ns) } diff --git a/posting/list.go b/posting/list.go index 649bccb065d..1bda7fa342c 100644 --- a/posting/list.go +++ b/posting/list.go @@ -1215,6 +1215,8 @@ func (l *List) rollup(readTs uint64, split bool) (*rollupOutput, error) { } if len(out.plist.Splits) > 0 || len(l.mutationMap) > 0 { + // In case there were splits, this would read all the splits from + // Badger. if err := l.encode(out, readTs, split); err != nil { return nil, errors.Wrapf(err, "while encoding") } diff --git a/posting/list_test.go b/posting/list_test.go index d88956898d6..909babcd2dc 100644 --- a/posting/list_test.go +++ b/posting/list_test.go @@ -464,7 +464,7 @@ func TestMillion(t *testing.T) { kvs, err := ol.Rollup(nil) require.NoError(t, err) require.NoError(t, writePostingListToDisk(kvs)) - ol, err = getNew(key, ps, math.MaxUint64) + ol, err = readPostingListFromDisk(key, ps, math.MaxUint64) require.NoError(t, err) } commits++ @@ -906,7 +906,7 @@ func createMultiPartList(t *testing.T, size int, addFacet bool) (*List, int) { kvs, err := ol.Rollup(nil) require.NoError(t, err) require.NoError(t, writePostingListToDisk(kvs)) - ol, err = getNew(key, ps, math.MaxUint64) + ol, err = readPostingListFromDisk(key, ps, math.MaxUint64) require.NoError(t, err) } commits++ @@ -919,7 +919,7 @@ func createMultiPartList(t *testing.T, size int, addFacet bool) (*List, int) { require.Equal(t, uint64(curTs+1), kv.Version) } require.NoError(t, writePostingListToDisk(kvs)) - ol, err = getNew(key, ps, math.MaxUint64) + ol, err = readPostingListFromDisk(key, ps, math.MaxUint64) require.NoError(t, err) require.Nil(t, ol.plist.Bitmap) require.Equal(t, 0, len(ol.plist.Postings)) @@ -950,7 +950,7 @@ func createAndDeleteMultiPartList(t *testing.T, size int) (*List, int) { kvs, err := ol.Rollup(nil) require.NoError(t, err) require.NoError(t, writePostingListToDisk(kvs)) - ol, err = getNew(key, ps, math.MaxUint64) + ol, err = readPostingListFromDisk(key, ps, math.MaxUint64) require.NoError(t, err) } commits++ @@ -1001,7 +1001,7 @@ func TestLargePlistSplit(t *testing.T) { _, err = ol.Rollup(nil) require.NoError(t, err) - ol, err = getNew(key, ps, math.MaxUint64) + ol, err = readPostingListFromDisk(key, ps, math.MaxUint64) require.NoError(t, err) b = make([]byte, 5<<20) rand.Read(b) @@ -1020,7 +1020,7 @@ func TestLargePlistSplit(t *testing.T) { kvs, err := ol.Rollup(nil) require.NoError(t, err) require.NoError(t, writePostingListToDisk(kvs)) - ol, err = getNew(key, ps, math.MaxUint64) + ol, err = readPostingListFromDisk(key, ps, math.MaxUint64) require.NoError(t, err) require.Nil(t, ol.plist.Bitmap) require.Equal(t, 0, len(ol.plist.Postings)) @@ -1071,6 +1071,21 @@ func writePostingListToDisk(kvs []*bpb.KV) error { return writer.Flush() } +func readPostingListFromDisk(key []byte, pstore *badger.DB, readTs uint64) (*List, error) { + txn := pstore.NewTransactionAt(readTs, false) + defer txn.Discard() + + // When we do rollups, an older version would go to the top of the LSM tree, which can cause + // issues during txn.Get. Therefore, always iterate. + iterOpts := badger.DefaultIteratorOptions + iterOpts.AllVersions = true + iterOpts.PrefetchValues = false + itr := txn.NewKeyIterator(key, iterOpts) + defer itr.Close() + itr.Seek(key) + return ReadPostingList(key, itr) +} + // Create a multi-part list and verify all the uids are there. func TestMultiPartListBasic(t *testing.T) { // TODO(sroar): Increase size to 1e5 once sroar is optimized. @@ -1122,7 +1137,7 @@ func TestBinSplit(t *testing.T) { require.Equal(t, uint64(size+2), kv.Version) } require.NoError(t, writePostingListToDisk(kvs)) - ol, err = getNew(key, ps, math.MaxUint64) + ol, err = readPostingListFromDisk(key, ps, math.MaxUint64) require.NoError(t, err) require.Equal(t, 0, len(ol.plist.Splits)) require.Equal(t, size, len(ol.plist.Postings)) @@ -1285,7 +1300,7 @@ func TestMultiPartListWriteToDisk(t *testing.T) { require.Equal(t, len(kvs), len(originalList.plist.Splits)+1) require.NoError(t, writePostingListToDisk(kvs)) - newList, err := getNew(kvs[0].Key, ps, math.MaxUint64) + newList, err := readPostingListFromDisk(kvs[0].Key, ps, math.MaxUint64) require.NoError(t, err) opt := ListOptions{ReadTs: math.MaxUint64} @@ -1354,7 +1369,7 @@ func TestMultiPartListDeleteAndAdd(t *testing.T) { kvs, err := ol.Rollup(nil) require.NoError(t, err) require.NoError(t, writePostingListToDisk(kvs)) - ol, err = getNew(key, ps, math.MaxUint64) + ol, err = readPostingListFromDisk(key, ps, math.MaxUint64) require.NoError(t, err) } curTs++ @@ -1383,7 +1398,7 @@ func TestMultiPartListDeleteAndAdd(t *testing.T) { kvs, err := ol.Rollup(nil) require.NoError(t, err) require.NoError(t, writePostingListToDisk(kvs)) - ol, err = getNew(key, ps, math.MaxUint64) + ol, err = readPostingListFromDisk(key, ps, math.MaxUint64) require.NoError(t, err) } curTs++ @@ -1394,7 +1409,7 @@ func TestMultiPartListDeleteAndAdd(t *testing.T) { kvs, err := ol.Rollup(nil) require.NoError(t, err) require.NoError(t, writePostingListToDisk(kvs)) - ol, err = getNew(key, ps, math.MaxUint64) + ol, err = readPostingListFromDisk(key, ps, math.MaxUint64) require.NoError(t, err) for _, kv := range kvs { require.Equal(t, curTs, kv.Version) @@ -1423,7 +1438,7 @@ func TestMultiPartListDeleteAndAdd(t *testing.T) { kvs, err := ol.Rollup(nil) require.NoError(t, err) require.NoError(t, writePostingListToDisk(kvs)) - ol, err = getNew(key, ps, math.MaxUint64) + ol, err = readPostingListFromDisk(key, ps, math.MaxUint64) require.NoError(t, err) } curTs++ @@ -1433,7 +1448,7 @@ func TestMultiPartListDeleteAndAdd(t *testing.T) { kvs, err = ol.Rollup(nil) require.NoError(t, err) require.NoError(t, writePostingListToDisk(kvs)) - ol, err = getNew(key, ps, math.MaxUint64) + ol, err = readPostingListFromDisk(key, ps, math.MaxUint64) require.NoError(t, err) // Verify all entries are once again in the list. @@ -1510,7 +1525,7 @@ func TestRecursiveSplits(t *testing.T) { kvs, err := ol.Rollup(nil) require.NoError(t, err) require.NoError(t, writePostingListToDisk(kvs)) - ol, err = getNew(key, ps, math.MaxUint64) + ol, err = readPostingListFromDisk(key, ps, math.MaxUint64) require.NoError(t, err) require.True(t, len(ol.plist.Splits) > 2) diff --git a/posting/lists.go b/posting/lists.go index 12319b7edb7..21764ee11e1 100644 --- a/posting/lists.go +++ b/posting/lists.go @@ -22,12 +22,11 @@ import ( "sync" "time" - ostats "go.opencensus.io/stats" - "github.com/dgraph-io/badger/v3" "github.com/dgraph-io/dgo/v210/protos/api" "github.com/dgraph-io/ristretto" "github.com/dgraph-io/ristretto/z" + ostats "go.opencensus.io/stats" "github.com/dgraph-io/dgraph/protos/pb" "github.com/dgraph-io/dgraph/x" @@ -48,10 +47,12 @@ func Init(ps *badger.DB, cacheSize int64) { pstore = ps closer = z.NewCloser(1) go x.MonitorMemoryMetrics(closer) + // Initialize cache. if cacheSize == 0 { return } + var err error lCache, err = ristretto.NewCache(&ristretto.Config{ // Use 5% of cache memory for storing counters. @@ -60,11 +61,38 @@ func Init(ps *badger.DB, cacheSize int64) { BufferItems: 64, Metrics: true, Cost: func(val interface{}) int64 { - l, ok := val.(*List) - if !ok { - return int64(0) + switch val.(type) { + case *List: + return int64(val.(*List).DeepSize()) + case uint64: + return 8 + default: + x.AssertTruef(false, "Don't know about type %T in Dgraph cache", val) + return 0 + } + }, + ShouldUpdate: func(prev, cur interface{}) bool { + var prevTs, curTs uint64 + switch prev.(type) { + case *List: + prevTs = prev.(*List).maxTs + case uint64: + prevTs = prev.(uint64) + default: + x.AssertTruef(false, "Don't know about type %T in Dgraph cache", prev) + } + + switch cur.(type) { + case *List: + curTs = cur.(*List).maxTs + case uint64: + curTs = cur.(uint64) + default: + x.AssertTruef(false, "Don't know about type %T in Dgraph cache", cur) } - return int64(l.DeepSize()) + // Only update the value if we have a timestamp >= the previous + // value. + return curTs >= prevTs }, }) x.Check(err) diff --git a/posting/mvcc.go b/posting/mvcc.go index e3c33cbf823..78b03393d4d 100644 --- a/posting/mvcc.go +++ b/posting/mvcc.go @@ -18,7 +18,6 @@ package posting import ( "bytes" - "encoding/binary" "encoding/hex" "math" "sort" @@ -80,8 +79,8 @@ func init() { } } -// rollUpKey takes the given key's posting lists, rolls it up and writes back to badger -func (ir *incrRollupi) rollUpKey(sl *skl.Skiplist, key []byte) error { +// rollupKey takes the given key's posting lists, rolls it up and writes back to badger +func (ir *incrRollupi) rollupKey(sl *skl.Skiplist, key []byte) error { l, err := GetNoStore(key, math.MaxUint64) if err != nil { return err @@ -91,9 +90,11 @@ func (ir *incrRollupi) rollUpKey(sl *skl.Skiplist, key []byte) error { if err != nil { return err } - // Clear the list from the cache after a rollup. - RemoveCacheFor(key) + // If we do a rollup, we typically won't need to update the key in cache. + // The only caveat is that the key written by rollup would be written at +1 + // timestamp, hence bumping the latest TS for the key by 1. The cache should + // understand that. const N = uint64(1000) if glog.V(2) { if count := atomic.AddUint64(&ir.count, 1); count%N == 0 { @@ -180,7 +181,7 @@ func (ir *incrRollupi) Process(closer *z.Closer) { // Key not present or Key present but last roll up was more than 10 sec ago. // Add/Update map and rollup. m[hash] = currTs - if err := ir.rollUpKey(sl, key); err != nil { + if err := ir.rollupKey(sl, key); err != nil { glog.Warningf("Error %v rolling up key %v\n", err, key) } } @@ -202,6 +203,9 @@ func (ir *incrRollupi) Process(closer *z.Closer) { } } case <-baseTick.C: + // Pick up incomplete batches from the keysPool, and process them. + // This handles infrequent writes case, where a batch might take a + // long time to fill up. batch := ir.priorityKeys[0].keysPool.Get().(*[][]byte) if len(*batch) > 0 { doRollup(batch, 0) @@ -213,6 +217,7 @@ func (ir *incrRollupi) Process(closer *z.Closer) { handover() } case batch := <-ir.priorityKeys[0].keysCh: + // P0 keys are high priority keys. They have more than a threshold number of deltas. doRollup(batch, 0) // We don't need a limiter here as we don't expect to call this function frequently. case batch := <-ir.priorityKeys[1].keysCh: @@ -272,60 +277,6 @@ func (txn *Txn) FillContext(ctx *api.TxnContext, gid uint32) { txn.cache.fillPreds(ctx, gid) } -func (txn *Txn) ToBuffer(buf *z.Buffer, commitTs uint64) error { - if commitTs == 0 { - return nil - } - - cache := txn.cache - cache.Lock() - defer cache.Unlock() - - var keys []string - for key := range cache.deltas { - keys = append(keys, key) - } - - defer func() { - // Add these keys to be rolled up after we're done writing. This is the right place for them - // to be rolled up, because we just pushed these deltas over to Badger. - for _, key := range keys { - IncrRollup.addKeyToBatch([]byte(key), 1) - } - }() - - for _, key := range keys { - k := []byte(key) - data := cache.deltas[key] - if len(data) == 0 { - continue - } - - if err := badger.ValidEntry(pstore, k, data); err != nil { - glog.Errorf("Invalid Entry. len(key): %d len(val): %d\n", len(k), len(data)) - continue - } - if ts := cache.maxVersions[key]; ts >= commitTs { - // Skip write because we already have a write at a higher ts. - // Logging here can cause a lot of output when doing Raft log replay. So, let's - // not output anything here. - continue - } - - key := y.KeyWithTs(k, commitTs) - val := y.ValueStruct{ - Value: data, - UserMeta: BitDeltaPosting, - } - - dst := buf.SliceAllocate(2 + len(key) + int(val.EncodedSize())) - binary.BigEndian.PutUint16(dst[:2], uint16(len(key))) - x.AssertTrue(len(key) == copy(dst[2:], key)) - x.AssertTrue(uint32(len(dst)-2-len(key)) == val.Encode(dst[2+len(key):])) - } - return nil -} - // ToSkiplist replaces CommitToDisk. ToSkiplist creates a Badger usable Skiplist from the Txn, so // it can be passed over to Badger after commit. This only stores deltas to the commit timestamps. // It does not try to generate a state. State generation is done via rollups, which happen when a @@ -341,14 +292,11 @@ func (txn *Txn) ToSkiplist() error { } sort.Strings(keys) - // defer func() { - // Add these keys to be rolled up after we're done writing. This is the right place for them - // to be rolled up, because we just pushed these deltas over to Badger. - // TODO: This is no longer the right place. Figure out a new place for these keys. - // for _, key := range keys { - // IncrRollup.addKeyToBatch([]byte(key), 1) - // } - // }() + // Add these keys to be rolled up after we're done writing them to Badger. + // Some full text indices could easily gain hundreds of thousands of + // mutations, while never being read. We do want to capture those cases. + // Update: We roll up the keys in oracle.DeleteTxnsAndRollupKeys, which is a + // callback that happens after skip list gets handed over to Badger. b := skl.NewBuilder(1 << 10) for _, key := range keys { @@ -372,32 +320,21 @@ func (txn *Txn) ToSkiplist() error { return nil } -// ResetCache will clear all the cached list. func ResetCache() { lCache.Clear() } -// RemoveCacheFor will delete the list corresponding to the given key. -func RemoveCacheFor(key []byte) { - // TODO: investigate if this can be done by calling Set with a nil value. - lCache.Del(key) -} - // RemoveCachedKeys will delete the cached list by this txn. -func (txn *Txn) RemoveCachedKeys() { +func (txn *Txn) UpdateCachedKeys(commitTs uint64) { if txn == nil || txn.cache == nil { return } + x.AssertTrue(commitTs > 0) for key := range txn.cache.deltas { - lCache.Del(key) + lCache.SetIfPresent([]byte(key), commitTs, 0) } } -func WaitForCache() { - // TODO Investigate if this is needed and why Jepsen tests fail with the cache enabled. - // lCache.Wait() -} - func unmarshalOrCopy(plist *pb.PostingList, item *badger.Item) error { if plist == nil { return errors.Errorf("cannot unmarshal value to a nil posting list of key %s", @@ -518,27 +455,45 @@ func getNew(key []byte, pstore *badger.DB, readTs uint64) (*List, error) { if pstore.IsClosed() { return nil, badger.ErrDBClosed } - // TODO: Fix this up later. - // cachedVal, ok := lCache.Get(key) - // if ok { - // l, ok := cachedVal.(*List) - // if ok && l != nil { - // // No need to clone the immutable layer or the key since mutations will not modify it. - // lCopy := &List{ - // minTs: l.minTs, - // maxTs: l.maxTs, - // key: key, - // plist: l.plist, - // } - // if l.mutationMap != nil { - // lCopy.mutationMap = make(map[uint64]*pb.PostingList, len(l.mutationMap)) - // for ts, pl := range l.mutationMap { - // lCopy.mutationMap[ts] = proto.Clone(pl).(*pb.PostingList) - // } - // } - // return lCopy, nil - // } - // } + + var seenTs uint64 + // We use badger subscription to invalidate the cache. For every write we make the value + // corresponding to the key in the cache to nil. So, if we get some non-nil value from the cache + // then it means that no writes have happened after the last set of this key in the cache. + if val, ok := lCache.Get(key); ok { + switch val.(type) { + case *List: + l := val.(*List) + // l.maxTs can be greater than readTs. We might have the latest + // version cached, while readTs is looking for an older version. + if l != nil && l.maxTs <= readTs { + l.RLock() + lCopy := copyList(l) + l.RUnlock() + return lCopy, nil + } + + case uint64: + seenTs = val.(uint64) + } + } else { + // The key wasn't found in cache. So, we set the key upfront. This + // gives it a chance to register in the cache, so it can capture any new + // writes comming from commits. Once we + // retrieve the value from Badger, we do an update if the key is already + // present in the cache. + // We must guarantee that the cache contains the latest version of the + // key. This mechanism avoids the following race condition: + // 1. We read from Badger at Ts 10. + // 2. New write comes in for the key at Ts 12. The key isn't in cache, + // so this write doesn't get registered with the cache. + // 3. Cache set the value read from Badger at Ts10. + // + // With this Set then Update mechanism, before we read from Badger, we + // already set the key in cache. So, any new writes coming in would get + // registered with cache correctly, before we update the value. + lCache.Set(key, uint64(1), 0) + } txn := pstore.NewTransactionAt(readTs, false) defer txn.Discard() @@ -550,11 +505,55 @@ func getNew(key []byte, pstore *badger.DB, readTs uint64) (*List, error) { iterOpts.PrefetchValues = false itr := txn.NewKeyIterator(key, iterOpts) defer itr.Close() - itr.Seek(key) + latestTs := itr.Seek(key) l, err := ReadPostingList(key, itr) if err != nil { return l, err } - // lCache.Set(key, l, 0) - return l, nil + l.RLock() + // Rollup is useful to improve memory utilization in the cache and also for + // reads. However, in case the posting list is split, this would read all + // the parts and create a full PL. Not sure how much of an issue that is. + out, err := l.rollup(math.MaxUint64, false) + l.RUnlock() + if err != nil { + return nil, err + } + + // We could consider writing this to Badger here, as we already have a + // rolled up version. But, doing the write here to Badger wouldn't be ideal. + // We write to Badger using Skiplists, instead of writing one entry at a + // time. In fact, rollups use getNew. So our cache here would get used by + // the roll up, hence achieving this optimization. + + newList := func() *List { + return &List{ + minTs: out.newMinTs, + maxTs: l.maxTs, + key: l.key, + plist: out.plist, + } + } + + // Only set l to the cache if readTs >= latestTs, which implies that l is + // the latest version of the PL. We also check that we're reading a version + // from Badger, which is higher than the write registered by the cache. + if readTs >= latestTs && latestTs >= seenTs { + lCache.SetIfPresent(key, newList(), 0) + } + return newList(), nil +} + +func copyList(l *List) *List { + l.AssertRLock() + // No need to clone the immutable layer or the key since mutations will not modify it. + lCopy := &List{ + minTs: l.minTs, + maxTs: l.maxTs, + key: l.key, + plist: l.plist, + } + // We do a rollup before storing PL in cache. + x.AssertTrue(len(l.mutationMap) == 0) + return lCopy } diff --git a/posting/oracle.go b/posting/oracle.go index 9623888209c..caa6af7adc4 100644 --- a/posting/oracle.go +++ b/posting/oracle.go @@ -268,10 +268,18 @@ func (o *oracle) WaitForTs(ctx context.Context, startTs uint64) error { } } -func (o *oracle) DeleteTxns(delta *pb.OracleDelta) { +// DeleteTxnsAndRollupKeys is called via a callback when Skiplist is handled +// over to Badger with latest commits in it. +func (o *oracle) DeleteTxnsAndRollupKeys(delta *pb.OracleDelta) { o.Lock() - for _, txn := range delta.Txns { - delete(o.pendingTxns, txn.StartTs) + for _, status := range delta.Txns { + txn := o.pendingTxns[status.StartTs] + if txn != nil && status.CommitTs > 0 { + for k := range txn.Deltas() { + IncrRollup.addKeyToBatch([]byte(k), 0) + } + } + delete(o.pendingTxns, status.StartTs) } o.Unlock() } diff --git a/worker/draft.go b/worker/draft.go index 11c65554af2..f795fe72904 100644 --- a/worker/draft.go +++ b/worker/draft.go @@ -1076,7 +1076,7 @@ func (n *node) commitOrAbort(_ uint64, delta *pb.OracleDelta) error { var sz int64 for _, status := range delta.Txns { txn := posting.Oracle().GetTxn(status.StartTs) - if txn == nil { + if txn == nil || status.CommitTs == 0 { continue } for k := range txn.Deltas() { @@ -1109,7 +1109,7 @@ func (n *node) commitOrAbort(_ uint64, delta *pb.OracleDelta) error { // This would be used for callback via Badger when skiplist is pushed to // disk. deleteTxns := func() { - posting.Oracle().DeleteTxns(delta) + posting.Oracle().DeleteTxnsAndRollupKeys(delta) } if len(itrs) == 0 { @@ -1157,9 +1157,10 @@ func (n *node) commitOrAbort(_ uint64, delta *pb.OracleDelta) error { // Clear all the cached lists that were touched by this transaction. for _, status := range delta.Txns { txn := posting.Oracle().GetTxn(status.StartTs) - txn.RemoveCachedKeys() + if status.CommitTs > 0 { + txn.UpdateCachedKeys(status.CommitTs) + } } - posting.WaitForCache() span.Annotate(nil, "cache keys removed") // Now advance Oracle(), so we can service waiting reads. diff --git a/worker/server_state.go b/worker/server_state.go index 7f5d0efb29d..314dc11bce1 100644 --- a/worker/server_state.go +++ b/worker/server_state.go @@ -40,14 +40,14 @@ const ( // breaks. AuditDefaults = `compress=false; days=10; size=100; dir=; output=; encrypt-file=;` BadgerDefaults = `compression=snappy; numgoroutines=8;` - CacheDefaults = `size-mb=1024; percentage=0,65,35;` + CacheDefaults = `size-mb=1024; percentage=50,30,20;` CDCDefaults = `file=; kafka=; sasl_user=; sasl_password=; ca_cert=; client_cert=; ` + `client_key=; sasl-mechanism=PLAIN;` GraphQLDefaults = `introspection=true; debug=false; extensions=true; poll-interval=1s; ` + `lambda-url=;` LimitDefaults = `mutations=allow; query-edge=1000000; normalize-node=10000; ` + `mutations-nquad=1000000; disallow-drop=false; query-timeout=0ms; txn-abort-after=5m;` + - `max-pending-queries=10000; max-retries=-1; shared-instance=false;` + `max-pending-queries=64; max-retries=-1; shared-instance=false;` RaftDefaults = `learner=false; snapshot-after-entries=10000; ` + `snapshot-after-duration=30m; pending-proposals=256; idx=; group=;` SecurityDefaults = `token=; whitelist=;` diff --git a/worker/snapshot.go b/worker/snapshot.go index 1850f65b158..e807f7f2d50 100644 --- a/worker/snapshot.go +++ b/worker/snapshot.go @@ -114,6 +114,8 @@ func (n *node) populateSnapshot(snap pb.Snapshot, pl *conn.Pool) error { if err := deleteStalePreds(ctx, done, snap.ReadTs); err != nil { return err } + // Reset the cache after having received a snapshot. + posting.ResetCache() glog.Infof("Snapshot writes DONE. Sending ACK") // Send an acknowledgement back to the leader.