Skip to content

Commit

Permalink
executor: enable stats LFU cache (#46246)
Browse files Browse the repository at this point in the history
ref #46158
  • Loading branch information
hawkingrei authored Aug 22, 2023
1 parent cb248a1 commit 29b9448
Show file tree
Hide file tree
Showing 10 changed files with 46 additions and 5 deletions.
2 changes: 2 additions & 0 deletions executor/sort_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ func testSortInDisk(t *testing.T, removeDir bool) {
defer restore()
config.UpdateGlobal(func(conf *config.Config) {
conf.TempStoragePath = t.TempDir()
conf.Performance.EnableStatsCacheMemQuota = true
})
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/executor/testSortedRowContainerSpill", "return(true)"))
defer func() {
Expand Down Expand Up @@ -97,6 +98,7 @@ func TestIssue16696(t *testing.T) {
defer config.RestoreFunc()()
config.UpdateGlobal(func(conf *config.Config) {
conf.TempStoragePath = t.TempDir()
conf.Performance.EnableStatsCacheMemQuota = true
})
alarmRatio := variable.MemoryUsageAlarmRatio.Load()
variable.MemoryUsageAlarmRatio.Store(0.0)
Expand Down
1 change: 1 addition & 0 deletions executor/test/admintest/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ func TestMain(m *testing.M) {
conf.TiKVClient.AsyncCommit.SafeWindow = 0
conf.TiKVClient.AsyncCommit.AllowedClockDrift = 0
conf.Experimental.AllowsExpressionIndex = true
conf.Performance.EnableStatsCacheMemQuota = true
})
tikv.EnableFailpoints()

Expand Down
1 change: 1 addition & 0 deletions executor/test/aggregate/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ go_test(
flaky = True,
shard_count = 39,
deps = [
"//config",
"//executor",
"//executor/internal",
"//parser/terror",
Expand Down
7 changes: 7 additions & 0 deletions executor/test/aggregate/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package aggregate
import (
"testing"

"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/testkit/testdata"
"github.com/pingcap/tidb/testkit/testsetup"
"go.uber.org/goleak"
Expand All @@ -29,6 +30,12 @@ func TestMain(m *testing.M) {
testsetup.SetupForCommonTest()
testDataMap.LoadTestSuiteData("testdata", "agg_suite")
aggMergeSuiteData = testDataMap["agg_suite"]
config.UpdateGlobal(func(conf *config.Config) {
conf.TiKVClient.AsyncCommit.SafeWindow = 0
conf.TiKVClient.AsyncCommit.AllowedClockDrift = 0
conf.Experimental.AllowsExpressionIndex = true
conf.Performance.EnableStatsCacheMemQuota = true
})
opts := []goleak.Option{
goleak.IgnoreTopFunction("github.com/golang/glog.(*fileSink).flushDaemon"),
goleak.IgnoreTopFunction("github.com/lestrrat-go/httprc.runFetchWorker"),
Expand Down
7 changes: 6 additions & 1 deletion executor/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"testing"

"github.com/pingcap/errors"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/executor/internal/exec"
"github.com/pingcap/tidb/planner/core"
"github.com/pingcap/tidb/types"
Expand Down Expand Up @@ -120,7 +121,11 @@ func TestEqualDatumsAsBinary(t *testing.T) {
{[]interface{}{1}, []interface{}{1, 1}, false},
{[]interface{}{nil}, []interface{}{1}, false},
}
base := exec.NewBaseExecutor(core.MockContext(), nil, 0)
ctx := core.MockContext()
base := exec.NewBaseExecutor(ctx, nil, 0)
defer func() {
domain.GetDomain(ctx).StatsHandle().Close()
}()
e := &InsertValues{BaseExecutor: base}
for _, tt := range tests {
res, err := e.equalDatumsAsBinary(types.MakeDatums(tt.a...), types.MakeDatums(tt.b...))
Expand Down
6 changes: 6 additions & 0 deletions statistics/handle/cache/internal/lfu/key_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,3 +65,9 @@ func (ks *keySet) Get(key int64) (*statistics.Table, bool) {
ks.mu.RUnlock()
return value, ok
}

func (ks *keySet) Clear() {
ks.mu.Lock()
ks.set = make(map[int64]*statistics.Table)
ks.mu.Unlock()
}
6 changes: 6 additions & 0 deletions statistics/handle/cache/internal/lfu/key_set_shard.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,3 +61,9 @@ func (kss *keySetShard) Len() int {
}
return result
}

func (kss *keySetShard) Clear() {
for idx := range kss.resultKeySet {
kss.resultKeySet[idx].Clear()
}
}
15 changes: 13 additions & 2 deletions statistics/handle/cache/internal/lfu/lfu_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package lfu

import (
"sync"
"sync/atomic"

"github.com/dgraph-io/ristretto"
Expand All @@ -31,6 +32,7 @@ type LFU struct {
cache *ristretto.Cache
resultKeySet *keySetShard
cost atomic.Int64
closeOnce sync.Once
}

// NewLFU creates a new LFU cache.
Expand Down Expand Up @@ -187,6 +189,15 @@ func (s *LFU) metrics() *ristretto.Metrics {

// Close implements statsCacheInner
func (s *LFU) Close() {
s.cache.Close()
s.cache.Wait()
s.closeOnce.Do(func() {
s.Clear()
s.cache.Close()
s.cache.Wait()
})
}

// Clear implements statsCacheInner
func (s *LFU) Clear() {
s.cache.Clear()
s.resultKeySet.Clear()
}
5 changes: 4 additions & 1 deletion statistics/handle/cache/statscache.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,10 @@ func (s *StatsCachePointer) Load() *StatsCache {

// Replace replaces the cache with the new cache.
func (s *StatsCachePointer) Replace(newCache *StatsCache) {
s.Store(newCache)
old := s.Swap(newCache)
if old != nil {
old.Close()
}
metrics.CostGauge.Set(float64(newCache.Cost()))
}

Expand Down
1 change: 0 additions & 1 deletion statistics/handle/cache/statscacheinner.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,6 @@ func (sc *StatsCache) SetCapacity(c int64) {
// Close stops the cache.
func (sc *StatsCache) Close() {
sc.c.Close()
logutil.BgLogger().Info("closed LFU cache")
}

// Version returns the version of the current cache, which is defined as
Expand Down

0 comments on commit 29b9448

Please sign in to comment.