Skip to content

Commit

Permalink
bindinfo: add timeout for loading binding from storage
Browse files Browse the repository at this point in the history
Signed-off-by: Weizhen Wang <wangweizhen@pingcap.com>
  • Loading branch information
hawkingrei committed Mar 11, 2024
1 parent a45838c commit b898638
Show file tree
Hide file tree
Showing 16 changed files with 270 additions and 119 deletions.
52 changes: 37 additions & 15 deletions pkg/bindinfo/binding_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package bindinfo
import (
"errors"
"sync"
"time"

"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/pkg/bindinfo/norm"
Expand All @@ -36,7 +37,7 @@ import (
// fuzzy matching, loading binding if cache miss automatically (TODO).
type FuzzyBindingCache interface {
// FuzzyMatchingBinding supports fuzzy matching on bindings.
FuzzyMatchingBinding(sctx sessionctx.Context, fuzzyDigest string, tableNames []*ast.TableName) (bindings Binding, isMatched bool)
FuzzyMatchingBinding(sctx sessionctx.Context, fuzzyDigest string, tableNames []*ast.TableName) (bindings Binding, isMatched bool, warn error)

// Copy copies this cache.
Copy() (c FuzzyBindingCache, err error)
Expand All @@ -57,10 +58,10 @@ type fuzzyBindingCache struct {
sql2FuzzyDigest map[string]string // sqlDigest --> fuzzyDigest

// loadBindingFromStorageFunc is used to load binding from storage if cache miss.
loadBindingFromStorageFunc func(sqlDigest string) (Bindings, error)
loadBindingFromStorageFunc func(sctx sessionctx.Context, sqlDigest string) (Bindings, error)
}

func newFuzzyBindingCache(loadBindingFromStorageFunc func(string) (Bindings, error)) FuzzyBindingCache {
func newFuzzyBindingCache(loadBindingFromStorageFunc func(sessionctx.Context, string) (Bindings, error)) FuzzyBindingCache {
return &fuzzyBindingCache{
BindingCache: newBindCache(),
fuzzy2SQLDigests: make(map[string][]string),
Expand All @@ -69,20 +70,27 @@ func newFuzzyBindingCache(loadBindingFromStorageFunc func(string) (Bindings, err
}
}

func (fbc *fuzzyBindingCache) FuzzyMatchingBinding(sctx sessionctx.Context, fuzzyDigest string, tableNames []*ast.TableName) (matchedBinding Binding, isMatched bool) {
matchedBinding, isMatched, missingSQLDigest := fbc.getFromMemory(sctx, fuzzyDigest, tableNames)
func (fbc *fuzzyBindingCache) FuzzyMatchingBinding(sctx sessionctx.Context, fuzzyDigest string, tableNames []*ast.TableName) (matchedBinding Binding, isMatched bool, warn error) {
matchedBinding, isMatched, missingSQLDigest := fbc.getFromMemory(sctx, fuzzyDigest, tableNames, true)
if len(missingSQLDigest) == 0 {
return
}
if fbc.loadBindingFromStorageFunc == nil {
return
}
fbc.loadFromStore(missingSQLDigest) // loadFromStore's SetBinding has a Mutex inside, so it's safe to call it without lock
matchedBinding, isMatched, _ = fbc.getFromMemory(sctx, fuzzyDigest, tableNames)
fbc.loadFromStore(sctx, missingSQLDigest) // loadFromStore's SetBinding has a Mutex inside, so it's safe to call it without lock
matchedBinding, isMatched, missingSQLDigest = fbc.getFromMemory(sctx, fuzzyDigest, tableNames, false)
if len(missingSQLDigest) != 0 {
if isMatched {
warn = errors.New("only part of the load was successful, resulting in some bindings not being effective")
} else {
warn = errors.New("failed to load bindings, optimization process without bindings")
}
}
return
}

func (fbc *fuzzyBindingCache) getFromMemory(sctx sessionctx.Context, fuzzyDigest string, tableNames []*ast.TableName) (matchedBinding Binding, isMatched bool, missingSQLDigest []string) {
func (fbc *fuzzyBindingCache) getFromMemory(sctx sessionctx.Context, fuzzyDigest string, tableNames []*ast.TableName, enableFailpoint bool) (matchedBinding Binding, isMatched bool, missingSQLDigest []string) {
fbc.mu.RLock()
defer fbc.mu.RUnlock()
bindingCache := fbc.BindingCache
Expand All @@ -92,7 +100,16 @@ func (fbc *fuzzyBindingCache) getFromMemory(sctx sessionctx.Context, fuzzyDigest
leastWildcards := len(tableNames) + 1
enableFuzzyBinding := sctx.GetSessionVars().EnableFuzzyBinding
for _, sqlDigest := range fbc.fuzzy2SQLDigests[fuzzyDigest] {
if bindings := bindingCache.GetBinding(sqlDigest); bindings != nil {
bindings := bindingCache.GetBinding(sqlDigest)
if enableFailpoint {
failpoint.Inject("get_binding_return_nil", func() {
bindings = nil
})
}
failpoint.Inject("get_binding_return_nil_always", func() {
bindings = nil
})
if bindings != nil {
for _, binding := range bindings {
numWildcards, matched := fuzzyMatchBindingTableName(sctx.GetSessionVars().CurrentDB, tableNames, binding.TableNames)
if matched && numWildcards > 0 && sctx != nil && !enableFuzzyBinding {
Expand All @@ -112,11 +129,19 @@ func (fbc *fuzzyBindingCache) getFromMemory(sctx sessionctx.Context, fuzzyDigest
return matchedBinding, isMatched, missingSQLDigest
}

func (fbc *fuzzyBindingCache) loadFromStore(missingSQLDigest []string) {
func (fbc *fuzzyBindingCache) loadFromStore(sctx sessionctx.Context, missingSQLDigest []string) {
failpoint.Inject("load_binding_nothing", func() {
failpoint.Return()
})
for _, sqlDigest := range missingSQLDigest {
bindings, err := fbc.loadBindingFromStorageFunc(sqlDigest)
start := time.Now()
bindings, err := fbc.loadBindingFromStorageFunc(sctx, sqlDigest)
if err != nil {
logutil.BgLogger().Warn("loadBindingFromStorageFunc binding failed", zap.String("sqlDigest", sqlDigest), zap.Error(err))
logutil.BgLogger().Warn("loadBindingFromStorageFunc failed",
zap.String("sqlDigest", sqlDigest),
zap.Error(err),
zap.Duration("duration", time.Since(start)),
)
continue
}
// put binding into the cache
Expand Down Expand Up @@ -317,9 +342,6 @@ func (c *bindingCache) delete(key bindingCacheKey) bool {
// The return value is not read-only, but it shouldn't be changed in the caller functions.
// The function is thread-safe.
func (c *bindingCache) GetBinding(sqlDigest string) Bindings {
failpoint.Inject("get_binding_return_nil", func(_ failpoint.Value) {
failpoint.Return(nil)
})
c.lock.Lock()
defer c.lock.Unlock()
return c.get(bindingCacheKey(sqlDigest))
Expand Down
7 changes: 6 additions & 1 deletion pkg/bindinfo/binding_match.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,9 +86,14 @@ func matchSQLBinding(sctx sessionctx.Context, stmtNode ast.StmtNode, info *Bindi
if globalHandle == nil {
return
}
if binding, matched := globalHandle.MatchGlobalBinding(sctx, fuzzyDigest, tableNames); matched {
binding, matched, warn := globalHandle.MatchGlobalBinding(sctx, fuzzyDigest, tableNames)
if warn != nil {
sctx.GetSessionVars().StmtCtx.AppendWarning(warn)
}
if matched {
return binding, matched, metrics.ScopeGlobal
}

return
}

Expand Down
6 changes: 4 additions & 2 deletions pkg/bindinfo/capture_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -331,8 +331,9 @@ func TestBindingSource(t *testing.T) {
bindHandle := dom.BindHandle()
stmt, _, _ := internal.UtilNormalizeWithDefaultDB(t, "select * from t where a > ?")
_, fuzzyDigest := norm.NormalizeStmtForBinding(stmt, norm.WithFuzz(true))
binding, matched := bindHandle.MatchGlobalBinding(tk.Session(), fuzzyDigest, bindinfo.CollectTableNames(stmt))
binding, matched, warn := bindHandle.MatchGlobalBinding(tk.Session(), fuzzyDigest, bindinfo.CollectTableNames(stmt))
require.True(t, matched)
require.NoError(t, warn)
require.Equal(t, "select * from `test` . `t` where `a` > ?", binding.OriginalSQL)
require.Equal(t, bindinfo.Manual, binding.Source)

Expand All @@ -350,8 +351,9 @@ func TestBindingSource(t *testing.T) {
bindHandle.CaptureBaselines()
stmt, _, _ = internal.UtilNormalizeWithDefaultDB(t, "select * from t where a < ?")
_, fuzzyDigest = norm.NormalizeStmtForBinding(stmt, norm.WithFuzz(true))
binding, matched = bindHandle.MatchGlobalBinding(tk.Session(), fuzzyDigest, bindinfo.CollectTableNames(stmt))
binding, matched, warn = bindHandle.MatchGlobalBinding(tk.Session(), fuzzyDigest, bindinfo.CollectTableNames(stmt))
require.True(t, matched)
require.NoError(t, warn)
require.Equal(t, "select * from `test` . `t` where `a` < ?", binding.OriginalSQL)
require.Equal(t, bindinfo.Capture, binding.Source)
}
Expand Down
18 changes: 12 additions & 6 deletions pkg/bindinfo/global_handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"time"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/pkg/metrics"
"github.com/pingcap/tidb/pkg/parser"
"github.com/pingcap/tidb/pkg/parser/ast"
Expand All @@ -46,7 +47,7 @@ type GlobalBindingHandle interface {
// Methods for create, get, drop global sql bindings.

// MatchGlobalBinding returns the matched binding for this statement.
MatchGlobalBinding(sctx sessionctx.Context, fuzzyDigest string, tableNames []*ast.TableName) (matchedBinding Binding, isMatched bool)
MatchGlobalBinding(sctx sessionctx.Context, fuzzyDigest string, tableNames []*ast.TableName) (matchedBinding Binding, isMatched bool, warn error)

// GetAllGlobalBindings returns all bind records in cache.
GetAllGlobalBindings() (bindings Bindings)
Expand Down Expand Up @@ -465,7 +466,7 @@ func (h *globalBindingHandle) Size() int {
}

// MatchGlobalBinding returns the matched binding for this statement.
func (h *globalBindingHandle) MatchGlobalBinding(sctx sessionctx.Context, fuzzyDigest string, tableNames []*ast.TableName) (matchedBinding Binding, isMatched bool) {
func (h *globalBindingHandle) MatchGlobalBinding(sctx sessionctx.Context, fuzzyDigest string, tableNames []*ast.TableName) (matchedBinding Binding, isMatched bool, warn error) {
return h.getCache().FuzzyMatchingBinding(sctx, fuzzyDigest, tableNames)
}

Expand Down Expand Up @@ -687,33 +688,38 @@ func (h *globalBindingHandle) Stats(_ *variable.SessionVars) (map[string]any, er
}

// LoadBindingsFromStorageToCache loads global bindings from storage to the memory cache.
func (h *globalBindingHandle) LoadBindingsFromStorage(sqlDigest string) (Bindings, error) {
func (h *globalBindingHandle) LoadBindingsFromStorage(sctx sessionctx.Context, sqlDigest string) (Bindings, error) {
if sqlDigest == "" {
return nil, nil
}
timeout := time.Duration(sctx.GetSessionVars().LoadBindingTimeout) * time.Millisecond
resultChan := h.syncBindingSingleflight.DoChan(sqlDigest, func() (any, error) {
return h.loadBindingsFromStorageInternal(sqlDigest)
})
select {
case result := <-resultChan:
if result.Err != nil {
logutil.BgLogger().Warn("fail to LoadBindingsFromStorageToCache", zap.Error(err))
return nil, result.Err
}
bindings := result.Val
if bindings == nil {
return nil, nil
}
return bindings.(Bindings), nil
case <-time.After(1 * time.Second):
case <-time.After(timeout):
return nil, errors.New("load bindings from storage timeout")
}
}

func (h *globalBindingHandle) loadBindingsFromStorageInternal(sqlDigest string) (any, error) {
failpoint.Inject("load_bindings_from_storage_internal_timeout", func() {
time.Sleep(time.Second)
})

var bindings Bindings
selectStmt := fmt.Sprintf("SELECT original_sql, bind_sql, default_db, status, create_time, update_time, charset, collation, source, sql_digest, plan_digest FROM mysql.bind_info where sql_digest = '%s'", sqlDigest)
err := h.callWithSCtx(false, func(sctx sessionctx.Context) error {
rows, _, err := execRows(sctx, "SELECT original_sql, bind_sql, default_db, status, create_time, update_time, charset, collation, source, sql_digest, plan_digest FROM mysql.bind_info where sql_digest = ?", sqlDigest)
rows, _, err := execRows(sctx, selectStmt)
if err != nil {
return err
}
Expand Down
19 changes: 12 additions & 7 deletions pkg/bindinfo/global_handle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,9 @@ func TestBindingLastUpdateTime(t *testing.T) {
require.NoError(t, err)

_, fuzzyDigest := norm.NormalizeStmtForBinding(stmt, norm.WithFuzz(true))
binding, matched := bindHandle.MatchGlobalBinding(tk.Session(), fuzzyDigest, bindinfo.CollectTableNames(stmt))
binding, matched, warn := bindHandle.MatchGlobalBinding(tk.Session(), fuzzyDigest, bindinfo.CollectTableNames(stmt))
require.True(t, matched)
require.NoError(t, warn)
updateTime := binding.UpdateTime.String()

rows1 := tk.MustQuery("show status like 'last_plan_binding_update_time';").Rows()
Expand Down Expand Up @@ -139,8 +140,9 @@ func TestBindParse(t *testing.T) {
stmt, err := parser.New().ParseOneStmt("select * from test . t", "", "")
require.NoError(t, err)
_, fuzzyDigest := norm.NormalizeStmtForBinding(stmt, norm.WithFuzz(true))
binding, matched := bindHandle.MatchGlobalBinding(tk.Session(), fuzzyDigest, bindinfo.CollectTableNames(stmt))
binding, matched, warn := bindHandle.MatchGlobalBinding(tk.Session(), fuzzyDigest, bindinfo.CollectTableNames(stmt))
require.True(t, matched)
require.NoError(t, warn)
require.Equal(t, "select * from `test` . `t`", binding.OriginalSQL)
require.Equal(t, "select * from `test` . `t` use index(index_t)", binding.BindSQL)
require.Equal(t, "test", binding.Db)
Expand Down Expand Up @@ -439,8 +441,9 @@ func TestGlobalBinding(t *testing.T) {
stmt, _, _ := internal.UtilNormalizeWithDefaultDB(t, testSQL.querySQL)

_, fuzzyDigest := norm.NormalizeStmtForBinding(stmt, norm.WithFuzz(true))
binding, matched := dom.BindHandle().MatchGlobalBinding(tk.Session(), fuzzyDigest, bindinfo.CollectTableNames(stmt))
binding, matched, warn := dom.BindHandle().MatchGlobalBinding(tk.Session(), fuzzyDigest, bindinfo.CollectTableNames(stmt))
require.True(t, matched)
require.NoError(t, warn)
require.Equal(t, testSQL.originSQL, binding.OriginalSQL)
require.Equal(t, testSQL.bindSQL, binding.BindSQL)
require.Equal(t, "test", binding.Db)
Expand Down Expand Up @@ -472,8 +475,9 @@ func TestGlobalBinding(t *testing.T) {
require.Equal(t, 1, bindHandle.Size())

_, fuzzyDigest = norm.NormalizeStmtForBinding(stmt, norm.WithFuzz(true))
binding, matched = dom.BindHandle().MatchGlobalBinding(tk.Session(), fuzzyDigest, bindinfo.CollectTableNames(stmt))
binding, matched, warn = dom.BindHandle().MatchGlobalBinding(tk.Session(), fuzzyDigest, bindinfo.CollectTableNames(stmt))
require.True(t, matched)
require.NoError(t, warn)
require.Equal(t, testSQL.originSQL, binding.OriginalSQL)
require.Equal(t, testSQL.bindSQL, binding.BindSQL)
require.Equal(t, "test", binding.Db)
Expand All @@ -487,17 +491,18 @@ func TestGlobalBinding(t *testing.T) {
require.Equal(t, uint64(1), tk.Session().AffectedRows())
require.NoError(t, err)
_, fuzzyDigest = norm.NormalizeStmtForBinding(stmt, norm.WithFuzz(true))
_, matched = dom.BindHandle().MatchGlobalBinding(tk.Session(), fuzzyDigest, bindinfo.CollectTableNames(stmt))
_, matched, warn = dom.BindHandle().MatchGlobalBinding(tk.Session(), fuzzyDigest, bindinfo.CollectTableNames(stmt))
require.False(t, matched) // dropped

require.NoError(t, warn)
bindHandle = bindinfo.NewGlobalBindingHandle(&mockSessionPool{tk.Session()})
err = bindHandle.LoadFromStorageToCache(true)
require.NoError(t, err)
require.Equal(t, 0, bindHandle.Size())

_, fuzzyDigest = norm.NormalizeStmtForBinding(stmt, norm.WithFuzz(true))
_, matched = dom.BindHandle().MatchGlobalBinding(tk.Session(), fuzzyDigest, bindinfo.CollectTableNames(stmt))
_, matched, warn = dom.BindHandle().MatchGlobalBinding(tk.Session(), fuzzyDigest, bindinfo.CollectTableNames(stmt))
require.False(t, matched) // dropped
require.NoError(t, warn)

rs, err = tk.Exec("show global bindings")
require.NoError(t, err)
Expand Down
3 changes: 2 additions & 1 deletion pkg/bindinfo/session_handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,8 @@ func (h *sessionBindingHandle) DropSessionBinding(sqlDigest string) error {

// MatchSessionBinding returns the matched binding for this statement.
func (h *sessionBindingHandle) MatchSessionBinding(sctx sessionctx.Context, fuzzyDigest string, tableNames []*ast.TableName) (matchedBinding Binding, isMatched bool) {
return h.ch.FuzzyMatchingBinding(sctx, fuzzyDigest, tableNames)
matchedBinding, isMatched, _ = h.ch.FuzzyMatchingBinding(sctx, fuzzyDigest, tableNames)
return
}

// GetAllSessionBindings return all session bind info.
Expand Down
Loading

0 comments on commit b898638

Please sign in to comment.