Skip to content

Commit

Permalink
Merge pull request #7479 from vmg/lfu-fixes
Browse files Browse the repository at this point in the history
LFU Cache Fixes
  • Loading branch information
systay authored Feb 12, 2021
2 parents 31a43f5 + e1d8a81 commit a577cc4
Show file tree
Hide file tree
Showing 13 changed files with 128 additions and 121 deletions.
8 changes: 4 additions & 4 deletions dev.env
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
# No shebang line as this script is sourced from an external shell.

# Copyright 2019 The Vitess Authors.
#
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#
# http://www.apache.org/licenses/LICENSE-2.0
#
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
Expand All @@ -34,6 +34,6 @@ export PATH

# According to https://github.com/etcd-io/etcd/blob/a621d807f061e1dd635033a8d6bc261461429e27/Documentation/op-guide/supported-platform.md,
# currently, etcd is unstable on arm64, so ETCD_UNSUPPORTED_ARCH should be set.
if [ "$(uname -m)" == aarch64 ]; then
if [ "$(uname -m)" = aarch64 ]; then
export ETCD_UNSUPPORTED_ARCH=arm64
fi
2 changes: 1 addition & 1 deletion go/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,5 +76,5 @@ type Config struct {
var DefaultConfig = &Config{
MaxEntries: 5000,
MaxMemoryUsage: 32 * 1024 * 1024,
LFU: false,
LFU: true,
}
73 changes: 7 additions & 66 deletions go/cache/ristretto/bloom/bbloom.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,6 @@
package bloom

import (
"bytes"
"encoding/json"
"log"
"math"
"unsafe"
)
Expand All @@ -43,26 +40,16 @@ func getSize(ui64 uint64) (size uint64, exponent uint64) {
return size, exponent
}

func calcSizeByWrongPositives(numEntries, wrongs float64) (uint64, uint64) {
size := -1 * numEntries * math.Log(wrongs) / math.Pow(float64(0.69314718056), 2)
locs := math.Ceil(float64(0.69314718056) * size / numEntries)
return uint64(size), uint64(locs)
// NewBloomFilterWithErrorRate returns a new bloomfilter with optimal size for the given
// error rate
func NewBloomFilterWithErrorRate(numEntries uint64, wrongs float64) *Bloom {
size := -1 * float64(numEntries) * math.Log(wrongs) / math.Pow(0.69314718056, 2)
locs := math.Ceil(0.69314718056 * size / float64(numEntries))
return NewBloomFilter(uint64(size), uint64(locs))
}

// NewBloomFilter returns a new bloomfilter.
func NewBloomFilter(params ...float64) (bloomfilter *Bloom) {
var entries, locs uint64
if len(params) == 2 {
if params[1] < 1 {
entries, locs = calcSizeByWrongPositives(params[0], params[1])
} else {
entries, locs = uint64(params[0]), uint64(params[1])
}
} else {
log.Fatal("usage: New(float64(number_of_entries), float64(number_of_hashlocations))" +
" i.e. New(float64(1000), float64(3)) or New(float64(number_of_entries)," +
" float64(number_of_hashlocations)) i.e. New(float64(1000), float64(0.03))")
}
func NewBloomFilter(entries, locs uint64) (bloomfilter *Bloom) {
size, exponent := getSize(entries)
bloomfilter = &Bloom{
sizeExp: exponent,
Expand Down Expand Up @@ -162,49 +149,3 @@ func (bl *Bloom) IsSet(idx uint64) bool {
r := ((*(*uint8)(ptr)) >> (idx % 8)) & 1
return r == 1
}

// bloomJSONImExport
// Im/Export structure used by JSONMarshal / JSONUnmarshal
type bloomJSONImExport struct {
FilterSet []byte
SetLocs uint64
}

// NewWithBoolset takes a []byte slice and number of locs per entry,
// returns the bloomfilter with a bitset populated according to the input []byte.
func newWithBoolset(bs *[]byte, locs uint64) *Bloom {
bloomfilter := NewBloomFilter(float64(len(*bs)<<3), float64(locs))
for i, b := range *bs {
*(*uint8)(unsafe.Pointer(uintptr(unsafe.Pointer(&bloomfilter.bitset[0])) + uintptr(i))) = b
}
return bloomfilter
}

// JSONUnmarshal takes JSON-Object (type bloomJSONImExport) as []bytes
// returns bloom32 / bloom64 object.
func JSONUnmarshal(dbData []byte) (*Bloom, error) {
bloomImEx := bloomJSONImExport{}
if err := json.Unmarshal(dbData, &bloomImEx); err != nil {
return nil, err
}
buf := bytes.NewBuffer(bloomImEx.FilterSet)
bs := buf.Bytes()
bf := newWithBoolset(&bs, bloomImEx.SetLocs)
return bf, nil
}

// JSONMarshal returns JSON-object (type bloomJSONImExport) as []byte.
func (bl Bloom) JSONMarshal() []byte {
bloomImEx := bloomJSONImExport{}
bloomImEx.SetLocs = bl.setLocs
bloomImEx.FilterSet = make([]byte, len(bl.bitset)<<3)
for i := range bloomImEx.FilterSet {
bloomImEx.FilterSet[i] = *(*byte)(unsafe.Pointer(uintptr(unsafe.Pointer(&bl.bitset[0])) +
uintptr(i)))
}
data, err := json.Marshal(bloomImEx)
if err != nil {
log.Fatal("json.Marshal failed: ", err)
}
return data
}
41 changes: 5 additions & 36 deletions go/cache/ristretto/bloom/bbloom_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,12 @@ import (
"os"
"testing"

"github.com/stretchr/testify/require"

"vitess.io/vitess/go/hack"
)

var (
wordlist1 [][]byte
n = 1 << 16
n = uint64(1 << 16)
bf *Bloom
)

Expand All @@ -31,7 +29,7 @@ func TestMain(m *testing.M) {
}

func TestM_NumberOfWrongs(t *testing.T) {
bf = NewBloomFilter(float64(n*10), float64(7))
bf = NewBloomFilter(n*10, 7)

cnt := 0
for i := range wordlist1 {
Expand All @@ -44,43 +42,14 @@ func TestM_NumberOfWrongs(t *testing.T) {

}

func TestM_JSON(t *testing.T) {
const shallBe = int(1 << 16)

bf = NewBloomFilter(float64(n*10), float64(7))

cnt := 0
for i := range wordlist1 {
hash := hack.RuntimeMemhash(wordlist1[i], 0)
if !bf.AddIfNotHas(hash) {
cnt++
}
}

jsonm := bf.JSONMarshal()

// create new bloomfilter from bloomfilter's JSON representation
bf2, err := JSONUnmarshal(jsonm)
require.NoError(t, err)

cnt2 := 0
for i := range wordlist1 {
hash := hack.RuntimeMemhash(wordlist1[i], 0)
if !bf2.AddIfNotHas(hash) {
cnt2++
}
}
require.Equal(t, shallBe, cnt2)
}

func BenchmarkM_New(b *testing.B) {
for r := 0; r < b.N; r++ {
_ = NewBloomFilter(float64(n*10), float64(7))
_ = NewBloomFilter(n*10, 7)
}
}

func BenchmarkM_Clear(b *testing.B) {
bf = NewBloomFilter(float64(n*10), float64(7))
bf = NewBloomFilter(n*10, 7)
for i := range wordlist1 {
hash := hack.RuntimeMemhash(wordlist1[i], 0)
bf.Add(hash)
Expand All @@ -92,7 +61,7 @@ func BenchmarkM_Clear(b *testing.B) {
}

func BenchmarkM_Add(b *testing.B) {
bf = NewBloomFilter(float64(n*10), float64(7))
bf = NewBloomFilter(n*10, 7)
b.ResetTimer()
for r := 0; r < b.N; r++ {
for i := range wordlist1 {
Expand Down
5 changes: 3 additions & 2 deletions go/cache/ristretto/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@ func defaultStringHash(key string) (uint64, uint64) {

type itemCallback func(*Item)

const itemSize = int64(unsafe.Sizeof(storeItem{}))
// CacheItemSize is the overhead in bytes for every stored cache item
const CacheItemSize = int64(unsafe.Sizeof(storeItem{}))

// Cache is a thread-safe implementation of a hashmap with a TinyLFU admission
// policy and a Sampled LFU eviction policy. You can use the same Cache instance
Expand Down Expand Up @@ -446,7 +447,7 @@ func (c *Cache) processItems() {
}
if !c.ignoreInternalCost {
// Add the cost of internally storing the object.
i.Cost += itemSize
i.Cost += CacheItemSize
}

switch i.flag {
Expand Down
2 changes: 1 addition & 1 deletion go/cache/ristretto/policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -375,7 +375,7 @@ type tinyLFU struct {
func newTinyLFU(numCounters int64) *tinyLFU {
return &tinyLFU{
freq: newCmSketch(numCounters),
door: bloom.NewBloomFilter(float64(numCounters), 0.01),
door: bloom.NewBloomFilterWithErrorRate(uint64(numCounters), 0.01),
resetAt: numCounters,
}
}
Expand Down
6 changes: 3 additions & 3 deletions go/mysql/fakesqldb/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@ const appendEntry = -1
type DB struct {
// Fields set at construction time.

// t is our testing.T instance
t *testing.T
// t is our testing.TB instance
t testing.TB

// listener is our mysql.Listener.
listener *mysql.Listener
Expand Down Expand Up @@ -151,7 +151,7 @@ type ExpectedExecuteFetch struct {
}

// New creates a server, and starts listening.
func New(t *testing.T) *DB {
func New(t testing.TB) *DB {
// Pick a path for our socket.
socketDir, err := ioutil.TempDir("", "fakesqldb")
if err != nil {
Expand Down
13 changes: 7 additions & 6 deletions go/vt/vttablet/endtoend/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/stretchr/testify/require"

"vitess.io/vitess/go/cache"
"vitess.io/vitess/go/cache/ristretto"
"vitess.io/vitess/go/sqltypes"
querypb "vitess.io/vitess/go/vt/proto/query"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
Expand Down Expand Up @@ -178,9 +179,8 @@ func TestConsolidatorReplicasOnly(t *testing.T) {

func TestQueryPlanCache(t *testing.T) {
if cache.DefaultConfig.LFU {
const cacheItemSize = 40
const cachedPlanSize = 2275 + cacheItemSize
const cachePlanSize2 = 2254 + cacheItemSize
const cachedPlanSize = 2352 + int(ristretto.CacheItemSize)
const cachePlanSize2 = 2326 + int(ristretto.CacheItemSize)
testQueryPlanCache(t, cachedPlanSize, cachePlanSize2)
} else {
testQueryPlanCache(t, 1, 1)
Expand All @@ -203,7 +203,7 @@ func testQueryPlanCache(t *testing.T, cachedPlanSize, cachePlanSize2 int) {
client := framework.NewClient()
_, _ = client.Execute("select * from vitess_test where intval=:ival1", bindVars)
_, _ = client.Execute("select * from vitess_test where intval=:ival2", bindVars)
time.Sleep(100 * time.Millisecond)
assert.Equal(t, 1, framework.Server.QueryPlanCacheLen())

vend := framework.DebugVars()
verifyIntValue(t, vend, "QueryCacheLength", 1)
Expand All @@ -212,13 +212,14 @@ func testQueryPlanCache(t *testing.T, cachedPlanSize, cachePlanSize2 int) {

framework.Server.SetQueryPlanCacheCap(64 * 1024)
_, _ = client.Execute("select * from vitess_test where intval=:ival1", bindVars)
time.Sleep(100 * time.Millisecond)
require.Equal(t, 2, framework.Server.QueryPlanCacheLen())

vend = framework.DebugVars()
verifyIntValue(t, vend, "QueryCacheLength", 2)
verifyIntValue(t, vend, "QueryCacheSize", cachedPlanSize*2)

_, _ = client.Execute("select * from vitess_test where intval=1", bindVars)
time.Sleep(100 * time.Millisecond)
require.Equal(t, 3, framework.Server.QueryPlanCacheLen())

vend = framework.DebugVars()
verifyIntValue(t, vend, "QueryCacheLength", 3)
Expand Down
5 changes: 5 additions & 0 deletions go/vt/vttablet/endtoend/queries_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package endtoend

import (
"testing"
"time"

"github.com/stretchr/testify/require"

Expand Down Expand Up @@ -1783,6 +1784,10 @@ func TestQueries(t *testing.T) {
},
},
}

// Wait for the vtgate caches to flush
time.Sleep(1 * time.Second)

for _, tcase := range testCases {
if err := tcase.Test("", client); err != nil {
t.Error(err)
Expand Down
6 changes: 6 additions & 0 deletions go/vt/vttablet/tabletserver/query_engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -413,6 +413,12 @@ func (qe *QueryEngine) QueryPlanCacheCap() int {
return int(qe.plans.MaxCapacity())
}

// QueryPlanCacheLen returns the length (size in entries) of the query cache
func (qe *QueryEngine) QueryPlanCacheLen() int {
qe.plans.Wait()
return qe.plans.Len()
}

// AddStats adds the given stats for the planName.tableName
func (qe *QueryEngine) AddStats(planName, tableName string, queryCount int64, duration, mysqlTime time.Duration, rowCount, errorCount int64) {
// table names can contain "." characters, replace them!
Expand Down
Loading

0 comments on commit a577cc4

Please sign in to comment.