Skip to content

Commit

Permalink
planner: remove some useless code in the binding package (pingcap#49250)
Browse files Browse the repository at this point in the history
  • Loading branch information
qw4990 authored Dec 7, 2023
1 parent be62f75 commit 6f72f88
Show file tree
Hide file tree
Showing 6 changed files with 60 additions and 163 deletions.
4 changes: 3 additions & 1 deletion pkg/bindinfo/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ go_library(
"capture.go",
"handle.go",
"session_handle.go",
"stat.go",
"util.go",
],
importpath = "github.com/pingcap/tidb/pkg/bindinfo",
Expand Down Expand Up @@ -39,6 +38,7 @@ go_library(
"//pkg/util/sqlexec",
"//pkg/util/stmtsummary/v2:stmtsummary",
"//pkg/util/table-filter",
"@com_github_ngaut_pools//:pools",
"@com_github_pingcap_errors//:errors",
"@org_golang_x_exp//maps",
"@org_uber_go_zap//:zap",
Expand Down Expand Up @@ -69,12 +69,14 @@ go_test(
"//pkg/parser/auth",
"//pkg/parser/model",
"//pkg/server",
"//pkg/session/types",
"//pkg/sessionctx/variable",
"//pkg/testkit",
"//pkg/testkit/testsetup",
"//pkg/util/hack",
"//pkg/util/parser",
"//pkg/util/stmtsummary",
"@com_github_ngaut_pools//:pools",
"@com_github_pingcap_failpoint//:failpoint",
"@com_github_prometheus_client_model//go",
"@com_github_stretchr_testify//require",
Expand Down
144 changes: 33 additions & 111 deletions pkg/bindinfo/handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,7 @@ import (

// BindHandle is used to handle all global sql bind operations.
type BindHandle struct {
// TODO: use a session pool instead of a single session.
sctx sessionctx.Context // used to run SQLs
sctxLock sync.Mutex // protect the sctx
sPool SessionPool

bindingCache atomic.Pointer[bindCache]

Expand Down Expand Up @@ -85,9 +83,9 @@ type bindRecordUpdate struct {
}

// NewBindHandle creates a new BindHandle.
func NewBindHandle(ctx sessionctx.Context) *BindHandle {
handle := &BindHandle{}
handle.Reset(ctx)
func NewBindHandle(sPool SessionPool) *BindHandle {
handle := &BindHandle{sPool: sPool}
handle.Reset()
return handle
}

Expand All @@ -101,8 +99,7 @@ func (h *BindHandle) setCache(c *bindCache) {
}

// Reset is to reset the BindHandle and clean old info.
func (h *BindHandle) Reset(ctx sessionctx.Context) {
h.sctx = ctx
func (h *BindHandle) Reset() {
h.lastUpdateTime.Store(types.ZeroTimestamp)
h.invalidBindRecordMap.Value.Store(make(map[string]*bindRecordUpdate))
h.invalidBindRecordMap.flushFunc = func(record *BindRecord) error {
Expand Down Expand Up @@ -246,85 +243,6 @@ func (h *BindHandle) CreateGlobalBinding(sctx sessionctx.Context, record *BindRe
})
}

// AddGlobalBinding adds a BindRecord to the storage and BindRecord to the cache.
func (h *BindHandle) AddGlobalBinding(sctx sessionctx.Context, record *BindRecord) (err error) {
err = record.prepareHints(sctx)
if err != nil {
return err
}

record.Db = strings.ToLower(record.Db)
oldRecord := h.GetGlobalBinding(parser.DigestNormalized(record.OriginalSQL).String(), record.OriginalSQL, record.Db)
var duplicateBinding *Binding
if oldRecord != nil {
binding := oldRecord.FindBinding(record.Bindings[0].ID)
if binding != nil {
// There is already a binding with status `Enabled`, `Disabled`, `PendingVerify` or `Rejected`, we could directly cancel the job.
if record.Bindings[0].Status == PendingVerify {
return nil
}
// Otherwise, we need to remove it before insert.
duplicateBinding = binding
}
}

return h.callWithSCtx(true, func(sctx sessionctx.Context) error {
defer func() {
h.appendGlobalCacheBinding(parser.DigestNormalized(record.OriginalSQL).String(), record)
}()

// Lock mysql.bind_info to synchronize with CreateBindRecord / AddBindRecord / DropBindRecord on other tidb instances.
if err = h.lockBindInfoTable(sctx); err != nil {
return err
}
if duplicateBinding != nil {
_, err = exec(sctx, `DELETE FROM mysql.bind_info WHERE original_sql = %? AND bind_sql = %?`, record.OriginalSQL, duplicateBinding.BindSQL)
if err != nil {
return err
}
}

now := types.NewTime(types.FromGoTime(time.Now()), mysql.TypeTimestamp, 3)
for i := range record.Bindings {
if duplicateBinding != nil {
record.Bindings[i].CreateTime = duplicateBinding.CreateTime
} else {
record.Bindings[i].CreateTime = now
}
record.Bindings[i].UpdateTime = now

if record.Bindings[i].SQLDigest == "" {
parser4binding := parser.New()
var originNode ast.StmtNode
originNode, err = parser4binding.ParseOneStmt(record.OriginalSQL, record.Bindings[i].Charset, record.Bindings[i].Collation)
if err != nil {
return err
}
_, sqlDigestWithDB := parser.NormalizeDigest(utilparser.RestoreWithDefaultDB(originNode, record.Db, record.OriginalSQL))
record.Bindings[i].SQLDigest = sqlDigestWithDB.String()
}
// Insert the BindRecord to the storage.
_, err = exec(sctx, `INSERT INTO mysql.bind_info VALUES (%?, %?, %?, %?, %?, %?, %?, %?, %?, %?, %?)`,
record.OriginalSQL,
record.Bindings[i].BindSQL,
record.Db,
record.Bindings[i].Status,
record.Bindings[i].CreateTime.String(),
record.Bindings[i].UpdateTime.String(),
record.Bindings[i].Charset,
record.Bindings[i].Collation,
record.Bindings[i].Source,
record.Bindings[i].SQLDigest,
record.Bindings[i].PlanDigest,
)
if err != nil {
return err
}
}
return nil
})
}

// DropGlobalBinding drops a BindRecord to the storage and BindRecord int the cache.
func (h *BindHandle) DropGlobalBinding(originalSQL, db string, binding *Binding) (deletedRows uint64, err error) {
err = h.callWithSCtx(false, func(sctx sessionctx.Context) error {
Expand Down Expand Up @@ -593,7 +511,7 @@ func newBindRecord(sctx sessionctx.Context, row chunk.Row) (string, *BindRecord,
if status == Using {
status = Enabled
}
hint := Binding{
binding := Binding{
BindSQL: row.GetString(1),
Status: status,
CreateTime: row.GetTime(4),
Expand All @@ -607,7 +525,7 @@ func newBindRecord(sctx sessionctx.Context, row chunk.Row) (string, *BindRecord,
bindRecord := &BindRecord{
OriginalSQL: row.GetString(0),
Db: strings.ToLower(row.GetString(2)),
Bindings: []Binding{hint},
Bindings: []Binding{binding},
}
sqlDigest := parser.DigestNormalized(bindRecord.OriginalSQL)
sctx.GetSessionVars().CurrentDB = bindRecord.Db
Expand All @@ -631,24 +549,6 @@ func (h *BindHandle) setGlobalCacheBinding(sqlDigest string, meta *BindRecord) {
updateMetrics(metrics.ScopeGlobal, oldRecord, meta, false)
}

// appendGlobalCacheBinding adds the BindRecord to the cache, all the stale BindRecords are
// removed from the cache after this operation.
func (h *BindHandle) appendGlobalCacheBinding(sqlDigest string, meta *BindRecord) {
newCache, err0 := h.getCache().Copy()
if err0 != nil {
logutil.BgLogger().Warn("BindHandle.appendBindRecord", zap.String("category", "sql-bind"), zap.Error(err0))
}
oldRecord := newCache.GetBinding(sqlDigest, meta.OriginalSQL, meta.Db)
newRecord := merge(oldRecord, meta)
err1 := newCache.SetBinding(sqlDigest, newRecord)
if err1 != nil && err0 == nil {
// Only need to handle the error once.
logutil.BgLogger().Warn("BindHandle.appendBindRecord", zap.String("category", "sql-bind"), zap.Error(err1))
}
h.setCache(newCache) // TODO: update it in place
updateMetrics(metrics.ScopeGlobal, oldRecord, newRecord, false)
}

// removeGlobalCacheBinding removes the BindRecord from the cache.
func (h *BindHandle) removeGlobalCacheBinding(sqlDigest string, meta *BindRecord) {
newCache, err := h.getCache().Copy()
Expand Down Expand Up @@ -801,11 +701,17 @@ func (h *BindHandle) ReloadGlobalBindings() error {
}

func (h *BindHandle) callWithSCtx(wrapTxn bool, f func(sctx sessionctx.Context) error) (err error) {
h.sctxLock.Lock()
defer h.sctxLock.Unlock()
resource, err := h.sPool.Get()
if err != nil {
return err
}
defer func() {
if err == nil { // only recycle when no error
h.sPool.Put(resource)
}
}()

sctx := h.sctx
// TODO: how to clean the sctx if meet any error?
sctx := resource.(sessionctx.Context)
if wrapTxn {
if _, err = exec(sctx, "BEGIN PESSIMISTIC"); err != nil {
return
Expand All @@ -823,3 +729,19 @@ func (h *BindHandle) callWithSCtx(wrapTxn bool, f func(sctx sessionctx.Context)
err = f(sctx)
return
}

var (
lastPlanBindingUpdateTime = "last_plan_binding_update_time"
)

// GetScope gets the status variables scope.
func (*BindHandle) GetScope(_ string) variable.ScopeFlag {
return variable.ScopeSession
}

// Stats returns the server statistics.
func (h *BindHandle) Stats(_ *variable.SessionVars) (map[string]interface{}, error) {
m := make(map[string]interface{})
m[lastPlanBindingUpdateTime] = h.getLastUpdateTime().String()
return m, nil
}
21 changes: 17 additions & 4 deletions pkg/bindinfo/handle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,12 @@ import (
"fmt"
"testing"

"github.com/ngaut/pools"
"github.com/pingcap/tidb/pkg/bindinfo"
"github.com/pingcap/tidb/pkg/bindinfo/internal"
"github.com/pingcap/tidb/pkg/metrics"
"github.com/pingcap/tidb/pkg/parser"
sessiontypes "github.com/pingcap/tidb/pkg/session/types"
"github.com/pingcap/tidb/pkg/testkit"
dto "github.com/prometheus/client_model/go"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -63,7 +65,7 @@ func TestBindingLastUpdateTime(t *testing.T) {
tk.MustExec("create global binding for select * from t0 using select * from t0 use index(a);")
tk.MustExec("admin reload bindings;")

bindHandle := bindinfo.NewBindHandle(tk.Session())
bindHandle := bindinfo.NewBindHandle(&mockSessionPool{tk.Session()})
err := bindHandle.Update(true)
require.NoError(t, err)
sql, sqlDigest := parser.NormalizeDigest("select * from test . t0")
Expand Down Expand Up @@ -126,7 +128,7 @@ func TestBindParse(t *testing.T) {
sql := fmt.Sprintf(`INSERT INTO mysql.bind_info(original_sql,bind_sql,default_db,status,create_time,update_time,charset,collation,source, sql_digest, plan_digest) VALUES ('%s', '%s', '%s', '%s', NOW(), NOW(),'%s', '%s', '%s', '%s', '%s')`,
originSQL, bindSQL, defaultDb, status, charset, collation, source, mockDigest, mockDigest)
tk.MustExec(sql)
bindHandle := bindinfo.NewBindHandle(tk.Session())
bindHandle := bindinfo.NewBindHandle(&mockSessionPool{tk.Session()})
err := bindHandle.Update(true)
require.NoError(t, err)
require.Equal(t, 1, bindHandle.Size())
Expand Down Expand Up @@ -475,7 +477,7 @@ func TestGlobalBinding(t *testing.T) {
require.NotNil(t, row.GetString(6))
require.NotNil(t, row.GetString(7))

bindHandle := bindinfo.NewBindHandle(tk.Session())
bindHandle := bindinfo.NewBindHandle(&mockSessionPool{tk.Session()})
err = bindHandle.Update(true)
require.NoError(t, err)
require.Equal(t, 1, bindHandle.Size())
Expand Down Expand Up @@ -506,7 +508,7 @@ func TestGlobalBinding(t *testing.T) {
// From newly created global bind handle.
require.Equal(t, testSQL.memoryUsage, pb.GetGauge().GetValue())

bindHandle = bindinfo.NewBindHandle(tk.Session())
bindHandle = bindinfo.NewBindHandle(&mockSessionPool{tk.Session()})
err = bindHandle.Update(true)
require.NoError(t, err)
require.Equal(t, 0, bindHandle.Size())
Expand Down Expand Up @@ -598,3 +600,14 @@ func TestRemoveDuplicatedPseudoBinding(t *testing.T) {
removeDuplicated()
checkPseudoBinding(1)
}

type mockSessionPool struct {
se sessiontypes.Session
}

func (p *mockSessionPool) Get() (pools.Resource, error) {
return p.se, nil
}

func (p *mockSessionPool) Put(pools.Resource) {
}
35 changes: 0 additions & 35 deletions pkg/bindinfo/stat.go

This file was deleted.

15 changes: 5 additions & 10 deletions pkg/bindinfo/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@ package bindinfo
import (
"context"

"github.com/ngaut/pools"
"github.com/pingcap/errors"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/parser/ast"
"github.com/pingcap/tidb/pkg/parser/terror"
"github.com/pingcap/tidb/pkg/sessionctx"
"github.com/pingcap/tidb/pkg/util/chunk"
"github.com/pingcap/tidb/pkg/util/sqlexec"
Expand All @@ -45,13 +45,8 @@ func execRows(sctx sessionctx.Context, sql string, args ...interface{}) (rows []
[]sqlexec.OptionFuncAlias{sqlexec.ExecOptionUseCurSession}, sql, args...)
}

// finishTransaction will execute `commit` when error is nil, otherwise `rollback`.
func finishTransaction(sctx sessionctx.Context, err error) error {
if err == nil {
_, _, err = execRows(sctx, "COMMIT")
} else {
_, _, err1 := execRows(sctx, "ROLLBACK")
terror.Log(errors.Trace(err1))
}
return errors.Trace(err)
// SessionPool is used to recycle sessionctx.
type SessionPool interface {
Get() (pools.Resource, error)
Put(pools.Resource)
}
4 changes: 2 additions & 2 deletions pkg/domain/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -1841,8 +1841,8 @@ func (do *Domain) BindHandle() *bindinfo.BindHandle {
func (do *Domain) LoadBindInfoLoop(ctxForHandle sessionctx.Context, ctxForEvolve sessionctx.Context) error {
ctxForHandle.GetSessionVars().InRestrictedSQL = true
ctxForEvolve.GetSessionVars().InRestrictedSQL = true
if !do.bindHandle.CompareAndSwap(nil, bindinfo.NewBindHandle(ctxForHandle)) {
do.bindHandle.Load().Reset(ctxForHandle)
if !do.bindHandle.CompareAndSwap(nil, bindinfo.NewBindHandle(do.sysSessionPool)) {
do.bindHandle.Load().Reset()
}

err := do.bindHandle.Load().Update(true)
Expand Down

0 comments on commit 6f72f88

Please sign in to comment.