Skip to content

Commit

Permalink
opt(cache): Use Ristretto to store posting lists (#7995)
Browse files Browse the repository at this point in the history
- Use some nifty techniques to make Ristretto caching work with MVCC posting lists.
- Use posting list cache by default.
- Set max pending to 64 by default.

This PR significantly improves query performance. This coupled with Roaring Bitmaps has shown to improve query latency by 20x for 50% of user queries, and 100x for 25% of them.

Co-authored-by: Manish R Jain <manish@dgraph.io>
  • Loading branch information
ahsanbarkati and manishrjain authored Aug 25, 2021
1 parent 3504044 commit c051c2f
Show file tree
Hide file tree
Showing 11 changed files with 204 additions and 140 deletions.
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,12 @@ require (
github.com/Shopify/sarama v1.27.2
github.com/blevesearch/bleve v1.0.13
github.com/codahale/hdrhistogram v0.0.0-20161010025455-3a0bb77429bd
github.com/dgraph-io/badger/v3 v3.0.0-20210707084205-c40b2e9af902
github.com/dgraph-io/badger/v3 v3.0.0-20210825061050-c2b23c471f5e
github.com/dgraph-io/dgo/v210 v210.0.0-20210407152819-261d1c2a6987
github.com/dgraph-io/gqlgen v0.13.2
github.com/dgraph-io/gqlparser/v2 v2.2.0
github.com/dgraph-io/graphql-transport-ws v0.0.0-20210511143556-2cef522f1f15
github.com/dgraph-io/ristretto v0.1.0
github.com/dgraph-io/ristretto v0.1.1-0.20210824115121-89e99415887a
github.com/dgraph-io/simdjson-go v0.3.0
github.com/dgraph-io/sroar v0.0.0-20210816194026-bc614dc5ce67
github.com/dgrijalva/jwt-go v3.2.0+incompatible
Expand Down
7 changes: 4 additions & 3 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -167,8 +167,8 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dgraph-io/badger v1.6.0 h1:DshxFxZWXUcO0xX476VJC07Xsr6ZCBVRHKZ93Oh7Evo=
github.com/dgraph-io/badger v1.6.0/go.mod h1:zwt7syl517jmP8s94KqSxTlM6IMsdhYy6psNgSztDR4=
github.com/dgraph-io/badger/v3 v3.0.0-20210707084205-c40b2e9af902 h1:Hk1GPDKdIAZD4TC8tiqjvwmYsicKI8URu0RIb/uFwDU=
github.com/dgraph-io/badger/v3 v3.0.0-20210707084205-c40b2e9af902/go.mod h1:dULbq6ehJ5K0cGW/1TQ9iSfUk0gbSiToDWmWmTsJ53E=
github.com/dgraph-io/badger/v3 v3.0.0-20210825061050-c2b23c471f5e h1:lugmhvI1tMal0wKW0g5uxIRHUqXpE5y1lgq/vm/UP/8=
github.com/dgraph-io/badger/v3 v3.0.0-20210825061050-c2b23c471f5e/go.mod h1:dULbq6ehJ5K0cGW/1TQ9iSfUk0gbSiToDWmWmTsJ53E=
github.com/dgraph-io/dgo/v210 v210.0.0-20210407152819-261d1c2a6987 h1:5aN6H88a2q3HkO8vSZxDlgjEpJf4Fz8rfy+/Wzx2uAc=
github.com/dgraph-io/dgo/v210 v210.0.0-20210407152819-261d1c2a6987/go.mod h1:dCzdThGGTPYOAuNtrM6BiXj/86voHn7ZzkPL6noXR3s=
github.com/dgraph-io/gqlgen v0.13.2 h1:TNhndk+eHKj5qE7BenKKSYdSIdOGhLqxR1rCiMso9KM=
Expand All @@ -178,8 +178,9 @@ github.com/dgraph-io/gqlparser/v2 v2.2.0 h1:fKSCW8OxoMogjDwUhO9OrFvrgIA0UZspTDbc
github.com/dgraph-io/gqlparser/v2 v2.2.0/go.mod h1:MYS4jppjyx8b9tuUtjV7jU1UFZK6P9fvO8TsIsQtRKU=
github.com/dgraph-io/graphql-transport-ws v0.0.0-20210511143556-2cef522f1f15 h1:X2NRsgAtVUAp2nmTPCq+x+wTcRRrj74CEpy7E0Unsl4=
github.com/dgraph-io/graphql-transport-ws v0.0.0-20210511143556-2cef522f1f15/go.mod h1:7z3c/5w0sMYYZF5bHsrh8IH4fKwG5O5Y70cPH1ZLLRQ=
github.com/dgraph-io/ristretto v0.1.0 h1:Jv3CGQHp9OjuMBSne1485aDpUkTKEcUqF+jm/LuerPI=
github.com/dgraph-io/ristretto v0.1.0/go.mod h1:fux0lOrBhrVCJd3lcTHsIJhq1T2rokOu6v9Vcb3Q9ug=
github.com/dgraph-io/ristretto v0.1.1-0.20210824115121-89e99415887a h1:2+hTlwc5yG4WAUXCoKWT/JJ11g8J1Q70in9abzFW7EQ=
github.com/dgraph-io/ristretto v0.1.1-0.20210824115121-89e99415887a/go.mod h1:fux0lOrBhrVCJd3lcTHsIJhq1T2rokOu6v9Vcb3Q9ug=
github.com/dgraph-io/simdjson-go v0.3.0 h1:h71LO7vR4LHMPUhuoGN8bqGm1VNfGOlAG8BI6iDUKw0=
github.com/dgraph-io/simdjson-go v0.3.0/go.mod h1:Otpysdjaxj9OGaJusn4pgQV7OFh2bELuHANq0I78uvY=
github.com/dgraph-io/sroar v0.0.0-20210816194026-bc614dc5ce67 h1:IVHiU4jfB86QzsbwtSxtP6q/CsB9gOB69i1AFu9TiIk=
Expand Down
8 changes: 8 additions & 0 deletions posting/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -1215,11 +1215,13 @@ func rebuildListType(ctx context.Context, rb *IndexRebuild) error {

// DeleteAll deletes all entries in the posting list.
func DeleteAll() error {
ResetCache()
return pstore.DropAll()
}

// DeleteData deletes all data for the namespace but leaves types and schema intact.
func DeleteData(ns uint64) error {
ResetCache()
prefix := make([]byte, 9)
prefix[0] = x.DefaultPrefix
binary.BigEndian.PutUint64(prefix[1:], ns)
Expand All @@ -1230,6 +1232,8 @@ func DeleteData(ns uint64) error {
// based on DB options set.
func DeletePredicate(ctx context.Context, attr string, ts uint64) error {
glog.Infof("Dropping predicate: [%s]", attr)
// TODO: We should only delete cache for certain keys, not all the keys.
ResetCache()
prefix := x.PredicatePrefix(attr)
if err := pstore.DropPrefix(prefix); err != nil {
return err
Expand All @@ -1241,6 +1245,8 @@ func DeletePredicate(ctx context.Context, attr string, ts uint64) error {
// writes.
func DeletePredicateBlocking(ctx context.Context, attr string, ts uint64) error {
glog.Infof("Dropping predicate: [%s]", attr)
// TODO: We should only delete cache for certain keys, not all the keys.
ResetCache()
prefix := x.PredicatePrefix(attr)
if err := pstore.DropPrefixBlocking(prefix); err != nil {
return err
Expand All @@ -1250,6 +1256,8 @@ func DeletePredicateBlocking(ctx context.Context, attr string, ts uint64) error

// DeleteNamespace bans the namespace and deletes its predicates/types from the schema.
func DeleteNamespace(ns uint64) error {
// TODO: We should only delete cache for certain keys, not all the keys.
ResetCache()
schema.State().DeletePredsForNs(ns)
return pstore.BanNamespace(ns)
}
2 changes: 2 additions & 0 deletions posting/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -1215,6 +1215,8 @@ func (l *List) rollup(readTs uint64, split bool) (*rollupOutput, error) {
}

if len(out.plist.Splits) > 0 || len(l.mutationMap) > 0 {
// In case there were splits, this would read all the splits from
// Badger.
if err := l.encode(out, readTs, split); err != nil {
return nil, errors.Wrapf(err, "while encoding")
}
Expand Down
43 changes: 29 additions & 14 deletions posting/list_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -464,7 +464,7 @@ func TestMillion(t *testing.T) {
kvs, err := ol.Rollup(nil)
require.NoError(t, err)
require.NoError(t, writePostingListToDisk(kvs))
ol, err = getNew(key, ps, math.MaxUint64)
ol, err = readPostingListFromDisk(key, ps, math.MaxUint64)
require.NoError(t, err)
}
commits++
Expand Down Expand Up @@ -906,7 +906,7 @@ func createMultiPartList(t *testing.T, size int, addFacet bool) (*List, int) {
kvs, err := ol.Rollup(nil)
require.NoError(t, err)
require.NoError(t, writePostingListToDisk(kvs))
ol, err = getNew(key, ps, math.MaxUint64)
ol, err = readPostingListFromDisk(key, ps, math.MaxUint64)
require.NoError(t, err)
}
commits++
Expand All @@ -919,7 +919,7 @@ func createMultiPartList(t *testing.T, size int, addFacet bool) (*List, int) {
require.Equal(t, uint64(curTs+1), kv.Version)
}
require.NoError(t, writePostingListToDisk(kvs))
ol, err = getNew(key, ps, math.MaxUint64)
ol, err = readPostingListFromDisk(key, ps, math.MaxUint64)
require.NoError(t, err)
require.Nil(t, ol.plist.Bitmap)
require.Equal(t, 0, len(ol.plist.Postings))
Expand Down Expand Up @@ -950,7 +950,7 @@ func createAndDeleteMultiPartList(t *testing.T, size int) (*List, int) {
kvs, err := ol.Rollup(nil)
require.NoError(t, err)
require.NoError(t, writePostingListToDisk(kvs))
ol, err = getNew(key, ps, math.MaxUint64)
ol, err = readPostingListFromDisk(key, ps, math.MaxUint64)
require.NoError(t, err)
}
commits++
Expand Down Expand Up @@ -1001,7 +1001,7 @@ func TestLargePlistSplit(t *testing.T) {
_, err = ol.Rollup(nil)
require.NoError(t, err)

ol, err = getNew(key, ps, math.MaxUint64)
ol, err = readPostingListFromDisk(key, ps, math.MaxUint64)
require.NoError(t, err)
b = make([]byte, 5<<20)
rand.Read(b)
Expand All @@ -1020,7 +1020,7 @@ func TestLargePlistSplit(t *testing.T) {
kvs, err := ol.Rollup(nil)
require.NoError(t, err)
require.NoError(t, writePostingListToDisk(kvs))
ol, err = getNew(key, ps, math.MaxUint64)
ol, err = readPostingListFromDisk(key, ps, math.MaxUint64)
require.NoError(t, err)
require.Nil(t, ol.plist.Bitmap)
require.Equal(t, 0, len(ol.plist.Postings))
Expand Down Expand Up @@ -1071,6 +1071,21 @@ func writePostingListToDisk(kvs []*bpb.KV) error {
return writer.Flush()
}

func readPostingListFromDisk(key []byte, pstore *badger.DB, readTs uint64) (*List, error) {
txn := pstore.NewTransactionAt(readTs, false)
defer txn.Discard()

// When we do rollups, an older version would go to the top of the LSM tree, which can cause
// issues during txn.Get. Therefore, always iterate.
iterOpts := badger.DefaultIteratorOptions
iterOpts.AllVersions = true
iterOpts.PrefetchValues = false
itr := txn.NewKeyIterator(key, iterOpts)
defer itr.Close()
itr.Seek(key)
return ReadPostingList(key, itr)
}

// Create a multi-part list and verify all the uids are there.
func TestMultiPartListBasic(t *testing.T) {
// TODO(sroar): Increase size to 1e5 once sroar is optimized.
Expand Down Expand Up @@ -1122,7 +1137,7 @@ func TestBinSplit(t *testing.T) {
require.Equal(t, uint64(size+2), kv.Version)
}
require.NoError(t, writePostingListToDisk(kvs))
ol, err = getNew(key, ps, math.MaxUint64)
ol, err = readPostingListFromDisk(key, ps, math.MaxUint64)
require.NoError(t, err)
require.Equal(t, 0, len(ol.plist.Splits))
require.Equal(t, size, len(ol.plist.Postings))
Expand Down Expand Up @@ -1285,7 +1300,7 @@ func TestMultiPartListWriteToDisk(t *testing.T) {
require.Equal(t, len(kvs), len(originalList.plist.Splits)+1)

require.NoError(t, writePostingListToDisk(kvs))
newList, err := getNew(kvs[0].Key, ps, math.MaxUint64)
newList, err := readPostingListFromDisk(kvs[0].Key, ps, math.MaxUint64)
require.NoError(t, err)

opt := ListOptions{ReadTs: math.MaxUint64}
Expand Down Expand Up @@ -1354,7 +1369,7 @@ func TestMultiPartListDeleteAndAdd(t *testing.T) {
kvs, err := ol.Rollup(nil)
require.NoError(t, err)
require.NoError(t, writePostingListToDisk(kvs))
ol, err = getNew(key, ps, math.MaxUint64)
ol, err = readPostingListFromDisk(key, ps, math.MaxUint64)
require.NoError(t, err)
}
curTs++
Expand Down Expand Up @@ -1383,7 +1398,7 @@ func TestMultiPartListDeleteAndAdd(t *testing.T) {
kvs, err := ol.Rollup(nil)
require.NoError(t, err)
require.NoError(t, writePostingListToDisk(kvs))
ol, err = getNew(key, ps, math.MaxUint64)
ol, err = readPostingListFromDisk(key, ps, math.MaxUint64)
require.NoError(t, err)
}
curTs++
Expand All @@ -1394,7 +1409,7 @@ func TestMultiPartListDeleteAndAdd(t *testing.T) {
kvs, err := ol.Rollup(nil)
require.NoError(t, err)
require.NoError(t, writePostingListToDisk(kvs))
ol, err = getNew(key, ps, math.MaxUint64)
ol, err = readPostingListFromDisk(key, ps, math.MaxUint64)
require.NoError(t, err)
for _, kv := range kvs {
require.Equal(t, curTs, kv.Version)
Expand Down Expand Up @@ -1423,7 +1438,7 @@ func TestMultiPartListDeleteAndAdd(t *testing.T) {
kvs, err := ol.Rollup(nil)
require.NoError(t, err)
require.NoError(t, writePostingListToDisk(kvs))
ol, err = getNew(key, ps, math.MaxUint64)
ol, err = readPostingListFromDisk(key, ps, math.MaxUint64)
require.NoError(t, err)
}
curTs++
Expand All @@ -1433,7 +1448,7 @@ func TestMultiPartListDeleteAndAdd(t *testing.T) {
kvs, err = ol.Rollup(nil)
require.NoError(t, err)
require.NoError(t, writePostingListToDisk(kvs))
ol, err = getNew(key, ps, math.MaxUint64)
ol, err = readPostingListFromDisk(key, ps, math.MaxUint64)
require.NoError(t, err)

// Verify all entries are once again in the list.
Expand Down Expand Up @@ -1510,7 +1525,7 @@ func TestRecursiveSplits(t *testing.T) {
kvs, err := ol.Rollup(nil)
require.NoError(t, err)
require.NoError(t, writePostingListToDisk(kvs))
ol, err = getNew(key, ps, math.MaxUint64)
ol, err = readPostingListFromDisk(key, ps, math.MaxUint64)
require.NoError(t, err)
require.True(t, len(ol.plist.Splits) > 2)

Expand Down
40 changes: 34 additions & 6 deletions posting/lists.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,11 @@ import (
"sync"
"time"

ostats "go.opencensus.io/stats"

"github.com/dgraph-io/badger/v3"
"github.com/dgraph-io/dgo/v210/protos/api"
"github.com/dgraph-io/ristretto"
"github.com/dgraph-io/ristretto/z"
ostats "go.opencensus.io/stats"

"github.com/dgraph-io/dgraph/protos/pb"
"github.com/dgraph-io/dgraph/x"
Expand All @@ -48,10 +47,12 @@ func Init(ps *badger.DB, cacheSize int64) {
pstore = ps
closer = z.NewCloser(1)
go x.MonitorMemoryMetrics(closer)

// Initialize cache.
if cacheSize == 0 {
return
}

var err error
lCache, err = ristretto.NewCache(&ristretto.Config{
// Use 5% of cache memory for storing counters.
Expand All @@ -60,11 +61,38 @@ func Init(ps *badger.DB, cacheSize int64) {
BufferItems: 64,
Metrics: true,
Cost: func(val interface{}) int64 {
l, ok := val.(*List)
if !ok {
return int64(0)
switch val.(type) {
case *List:
return int64(val.(*List).DeepSize())
case uint64:
return 8
default:
x.AssertTruef(false, "Don't know about type %T in Dgraph cache", val)
return 0
}
},
ShouldUpdate: func(prev, cur interface{}) bool {
var prevTs, curTs uint64
switch prev.(type) {
case *List:
prevTs = prev.(*List).maxTs
case uint64:
prevTs = prev.(uint64)
default:
x.AssertTruef(false, "Don't know about type %T in Dgraph cache", prev)
}

switch cur.(type) {
case *List:
curTs = cur.(*List).maxTs
case uint64:
curTs = cur.(uint64)
default:
x.AssertTruef(false, "Don't know about type %T in Dgraph cache", cur)
}
return int64(l.DeepSize())
// Only update the value if we have a timestamp >= the previous
// value.
return curTs >= prevTs
},
})
x.Check(err)
Expand Down
Loading

0 comments on commit c051c2f

Please sign in to comment.