Skip to content

Commit

Permalink
planner: refactor some code of binding cache (#58532)
Browse files Browse the repository at this point in the history
ref #51347
  • Loading branch information
qw4990 authored Dec 25, 2024
1 parent 94cf40a commit 3ba6df4
Show file tree
Hide file tree
Showing 11 changed files with 22 additions and 390 deletions.
3 changes: 0 additions & 3 deletions pkg/bindinfo/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,12 @@ go_library(
"//pkg/util/chunk",
"//pkg/util/hack",
"//pkg/util/hint",
"//pkg/util/intest",
"//pkg/util/parser",
"//pkg/util/sqlexec",
"//pkg/util/stringutil",
"@com_github_dgraph_io_ristretto//:ristretto",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_failpoint//:failpoint",
"@com_github_pkg_errors//:errors",
"@org_golang_x_sync//singleflight",
"@org_uber_go_zap//:zap",
],
)
Expand Down
103 changes: 4 additions & 99 deletions pkg/bindinfo/binding_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,36 +15,16 @@
package bindinfo

import (
"errors"
"sync"
"sync/atomic"
"time"

"github.com/dgraph-io/ristretto"
"github.com/pingcap/tidb/pkg/bindinfo/internal/logutil"
"github.com/pingcap/tidb/pkg/bindinfo/norm"
"github.com/pingcap/tidb/pkg/metrics"
"github.com/pingcap/tidb/pkg/parser"
"github.com/pingcap/tidb/pkg/parser/ast"
"github.com/pingcap/tidb/pkg/sessionctx"
"github.com/pingcap/tidb/pkg/sessionctx/variable"
"github.com/pingcap/tidb/pkg/util/intest"
"github.com/pingcap/tidb/pkg/util/stringutil"
"go.uber.org/zap"
)

// GetBindingReturnNil is only for test
var GetBindingReturnNil = stringutil.StringerStr("GetBindingReturnNil")

// GetBindingReturnNilBool is only for test
var GetBindingReturnNilBool atomic.Bool

// GetBindingReturnNilAlways is only for test
var GetBindingReturnNilAlways = stringutil.StringerStr("getBindingReturnNilAlways")

// LoadBindingNothing is only for test
var LoadBindingNothing = stringutil.StringerStr("LoadBindingNothing")

// digestBiMap represents a bidirectional map between noDBDigest and sqlDigest, used to support cross-db binding.
// One noDBDigest can map to multiple sqlDigests, but one sqlDigest can only map to one noDBDigest.
type digestBiMap interface {
Expand Down Expand Up @@ -168,12 +148,9 @@ type BindingCache interface {
type bindingCache struct {
digestBiMap digestBiMap // mapping between noDBDigest and sqlDigest, used to support cross-db binding.
cache *ristretto.Cache // the underlying cache to store the bindings.

// loadBindingFromStorageFunc is used to load binding from storage if cache miss.
loadBindingFromStorageFunc func(sctx sessionctx.Context, sqlDigest string) ([]*Binding, error)
}

func newBindCache(bindingLoad func(sctx sessionctx.Context, sqlDigest string) ([]*Binding, error)) BindingCache {
func newBindCache() BindingCache {
cache, _ := ristretto.NewCache(&ristretto.Config{
NumCounters: 1e6,
MaxCost: variable.MemQuotaBindingCache.Load(),
Expand All @@ -185,100 +162,28 @@ func newBindCache(bindingLoad func(sctx sessionctx.Context, sqlDigest string) ([
IgnoreInternalCost: true,
})
c := bindingCache{
cache: cache,
digestBiMap: newDigestBiMap(),
loadBindingFromStorageFunc: bindingLoad,
cache: cache,
digestBiMap: newDigestBiMap(),
}
return &c
}

func (c *bindingCache) shouldMetric() bool {
return c.loadBindingFromStorageFunc != nil // only metric for GlobalBindingCache, whose loadBindingFromStorageFunc is not nil.
}

func (c *bindingCache) MatchingBinding(sctx sessionctx.Context, noDBDigest string, tableNames []*ast.TableName) (matchedBinding *Binding, isMatched bool) {
matchedBinding, isMatched, missingSQLDigest := c.getFromMemory(sctx, noDBDigest, tableNames)
if len(missingSQLDigest) == 0 {
if c.shouldMetric() && isMatched {
metrics.BindingCacheHitCounter.Inc()
}
return
}
if c.shouldMetric() {
metrics.BindingCacheMissCounter.Inc()
}
if c.loadBindingFromStorageFunc == nil {
return
}
c.loadFromStore(sctx, missingSQLDigest) // loadFromStore's SetBinding has a Mutex inside, so it's safe to call it without lock
matchedBinding, isMatched, _ = c.getFromMemory(sctx, noDBDigest, tableNames)
return
}

func (c *bindingCache) getFromMemory(sctx sessionctx.Context, noDBDigest string, tableNames []*ast.TableName) (matchedBinding *Binding, isMatched bool, missingSQLDigest []string) {
if c.Size() == 0 {
return
}
possibleBindings := make([]*Binding, 0, 2)
for _, sqlDigest := range c.digestBiMap.NoDBDigest2SQLDigest(noDBDigest) {
binding := c.GetBinding(sqlDigest)
if intest.InTest {
if sctx.Value(GetBindingReturnNil) != nil {
if GetBindingReturnNilBool.CompareAndSwap(false, true) {
binding = nil
}
}
if sctx.Value(GetBindingReturnNilAlways) != nil {
binding = nil
}
}
if binding != nil {
possibleBindings = append(possibleBindings, binding)
} else {
missingSQLDigest = append(missingSQLDigest, sqlDigest)
}
}
if len(missingSQLDigest) != 0 {
return
// TODO: handle cache miss safely.
}
matchedBinding, isMatched = crossDBMatchBindings(sctx, tableNames, possibleBindings)
return
}

func (c *bindingCache) loadFromStore(sctx sessionctx.Context, missingSQLDigest []string) {
if intest.InTest && sctx.Value(LoadBindingNothing) != nil {
return
}
defer func(start time.Time) {
sctx.GetSessionVars().StmtCtx.AppendWarning(errors.New("loading binding from storage takes " + time.Since(start).String()))
}(time.Now())

for _, sqlDigest := range missingSQLDigest {
start := time.Now()
bindings, err := c.loadBindingFromStorageFunc(sctx, sqlDigest)
if err != nil {
logutil.BindLogger().Warn("failed to load binding from storage",
zap.String("sqlDigest", sqlDigest),
zap.Error(err),
zap.Duration("duration", time.Since(start)),
)
continue
}
// put binding into the cache
oldBinding := c.GetBinding(sqlDigest)
cachedBinding := pickCachedBinding(oldBinding, bindings...)
if cachedBinding != nil {
err = c.SetBinding(sqlDigest, cachedBinding)
if err != nil {
// When the memory capacity of bing_cache is not enough,
// there will be some memory-related errors in multiple places.
// Only needs to be handled once.
logutil.BindLogger().Warn("update binding cache error", zap.Error(err))
}
}
}
}

// GetBinding gets the Bindings from the cache.
// The return value is not read-only, but it shouldn't be changed in the caller functions.
// The function is thread-safe.
Expand Down
4 changes: 2 additions & 2 deletions pkg/bindinfo/binding_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func bindingNoDBDigest(t *testing.T, b *Binding) string {
}

func TestCrossDBBindingCache(t *testing.T) {
fbc := newBindCache(nil).(*bindingCache)
fbc := newBindCache().(*bindingCache)
b1 := &Binding{BindSQL: "SELECT * FROM db1.t1", SQLDigest: "b1"}
fDigest1 := bindingNoDBDigest(t, b1)
b2 := &Binding{BindSQL: "SELECT * FROM db2.t1", SQLDigest: "b2"}
Expand Down Expand Up @@ -76,7 +76,7 @@ func TestBindCache(t *testing.T) {
variable.MemQuotaBindingCache.Store(v)
}(variable.MemQuotaBindingCache.Load())
variable.MemQuotaBindingCache.Store(int64(kvSize*3) - 1)
bindCache := newBindCache(nil)
bindCache := newBindCache()
defer bindCache.Close()

err := bindCache.SetBinding("digest1", binding)
Expand Down
89 changes: 8 additions & 81 deletions pkg/bindinfo/global_handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ import (
utilparser "github.com/pingcap/tidb/pkg/util/parser"
"github.com/pingcap/tidb/pkg/util/sqlexec"
"go.uber.org/zap"
"golang.org/x/sync/singleflight"
)

// GlobalBindingHandle is used to handle all global sql bind operations.
Expand All @@ -67,9 +66,6 @@ type GlobalBindingHandle interface {

// Methods for load and clear global sql bindings.

// Reset is to reset the BindHandle and clean old info.
Reset()

// LoadFromStorageToCache loads global bindings from storage to the memory cache.
LoadFromStorageToCache(fullLoad bool) (err error)

Expand All @@ -78,17 +74,14 @@ type GlobalBindingHandle interface {

// Methods for memory control.

// SetBindingCacheCapacity reset the capacity for the bindingCache.
SetBindingCacheCapacity(capacity int64)

// GetMemUsage returns the memory usage for the bind cache.
GetMemUsage() (memUsage int64)

// GetMemCapacity returns the memory capacity for the bind cache.
GetMemCapacity() (memCapacity int64)

// Close closes the binding cache.
CloseCache()
// Close closes the binding handler.
Close()

variable.Statistics
}
Expand All @@ -102,9 +95,6 @@ type globalBindingHandle struct {
// lastTaskTime records the last update time for the global sql bind cache.
// This value is used to avoid reload duplicated bindings from storage.
lastUpdateTime atomic.Value

// syncBindingSingleflight is used to synchronize the execution of `LoadFromStorageToCache` method.
syncBindingSingleflight singleflight.Group
}

// Lease influences the duration of loading bind info and handling invalid bind.
Expand Down Expand Up @@ -132,16 +122,11 @@ const (

// NewGlobalBindingHandle creates a new GlobalBindingHandle.
func NewGlobalBindingHandle(sPool util.SessionPool) GlobalBindingHandle {
handle := &globalBindingHandle{sPool: sPool}
handle.Reset()
return handle
}

// Reset is to reset the BindHandle and clean old info.
func (h *globalBindingHandle) Reset() {
h := &globalBindingHandle{sPool: sPool}
h.lastUpdateTime.Store(types.ZeroTimestamp)
h.bindingCache = newBindCache(h.LoadBindingsFromStorage)
h.bindingCache = newBindCache()
variable.RegisterStatistics(h)
return h
}

func (h *globalBindingHandle) getLastUpdateTime() types.Time {
Expand Down Expand Up @@ -410,12 +395,6 @@ func (h *globalBindingHandle) GetAllGlobalBindings() (bindings []*Binding) {
return h.bindingCache.GetAllBindings()
}

// SetBindingCacheCapacity reset the capacity for the bindingCache.
// It will not affect already cached Bindings.
func (h *globalBindingHandle) SetBindingCacheCapacity(capacity int64) {
h.bindingCache.SetMemCapacity(capacity)
}

// GetMemUsage returns the memory usage for the bind cache.
func (h *globalBindingHandle) GetMemUsage() (memUsage int64) {
return h.bindingCache.GetMemUsage()
Expand Down Expand Up @@ -609,62 +588,10 @@ func (h *globalBindingHandle) Stats(_ *variable.SessionVars) (map[string]any, er
return m, nil
}

// Close closes the binding cache.
func (h *globalBindingHandle) CloseCache() {
// Close closes the binding handler.
func (h *globalBindingHandle) Close() {
h.bindingCache.Close()
}

// LoadBindingsFromStorageToCache loads global bindings from storage to the memory cache.
func (h *globalBindingHandle) LoadBindingsFromStorage(sctx sessionctx.Context, sqlDigest string) ([]*Binding, error) {
if sqlDigest == "" {
return nil, nil
}
timeout := time.Duration(sctx.GetSessionVars().LoadBindingTimeout) * time.Millisecond
resultChan := h.syncBindingSingleflight.DoChan(sqlDigest, func() (any, error) {
return h.loadBindingsFromStorageInternal(sqlDigest)
})
select {
case result := <-resultChan:
if result.Err != nil {
return nil, result.Err
}
bindings := result.Val
if bindings == nil {
return nil, nil
}
return bindings.([]*Binding), nil
case <-time.After(timeout):
return nil, errors.New("load bindings from storage timeout")
}
}

func (h *globalBindingHandle) loadBindingsFromStorageInternal(sqlDigest string) (any, error) {
failpoint.Inject("load_bindings_from_storage_internal_timeout", func() {
time.Sleep(time.Second)
})
var bindings []*Binding
selectStmt := fmt.Sprintf("SELECT original_sql, bind_sql, default_db, status, create_time, update_time, charset, collation, source, sql_digest, plan_digest FROM mysql.bind_info where sql_digest = '%s'", sqlDigest)
err := h.callWithSCtx(false, func(sctx sessionctx.Context) error {
rows, _, err := execRows(sctx, selectStmt)
if err != nil {
return err
}
bindings = make([]*Binding, 0, len(rows))
for _, row := range rows {
// Skip the builtin record which is designed for binding synchronization.
if row.GetString(0) == BuiltinPseudoSQL4BindLock {
continue
}
_, binding, err := newBinding(sctx, row)
if err != nil {
logutil.BindLogger().Warn("failed to generate bind record from data row", zap.Error(err))
continue
}
bindings = append(bindings, binding)
}
return nil
})
return bindings, err
h.sPool.Close()
}

// exec is a helper function to execute sql and return RecordSet.
Expand Down
2 changes: 1 addition & 1 deletion pkg/bindinfo/tests/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ go_test(
],
flaky = True,
race = "on",
shard_count = 19,
shard_count = 18,
deps = [
"//pkg/bindinfo",
"//pkg/bindinfo/internal",
Expand Down
Loading

0 comments on commit 3ba6df4

Please sign in to comment.