Skip to content

Commit

Permalink
Merge branch 'release-4.0' into release-4.0-46f5f217725e
Browse files Browse the repository at this point in the history
  • Loading branch information
AilinKid authored Mar 16, 2021
2 parents 1d5dd09 + 430a81c commit 3d4bb86
Showing 1 changed file with 59 additions and 62 deletions.
121 changes: 59 additions & 62 deletions bindinfo/handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (
"github.com/pingcap/parser/format"
"github.com/pingcap/parser/mysql"
"github.com/pingcap/parser/terror"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/metrics"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/variable"
Expand Down Expand Up @@ -123,17 +122,21 @@ func NewBindHandle(ctx sessionctx.Context) *BindHandle {
func (h *BindHandle) Update(fullLoad bool) (err error) {
h.bindInfo.Lock()
lastUpdateTime := h.bindInfo.lastUpdateTime
updateTime := lastUpdateTime.String()
if fullLoad {
updateTime = "0000-00-00 00:00:00"
}

sql := "select original_sql, bind_sql, default_db, status, create_time, update_time, charset, collation, source from mysql.bind_info"
if !fullLoad {
sql += " where update_time > \"" + lastUpdateTime.String() + "\""
exec := h.sctx.Context.(sqlexec.RestrictedSQLExecutor)
stmt, err := exec.ParseWithParams(context.TODO(), `SELECT original_sql, bind_sql, default_db, status, create_time, update_time, charset, collation, source
FROM mysql.bind_info WHERE update_time > %? ORDER BY update_time`, updateTime)
if err != nil {
return err
}
// We need to apply the updates by order, wrong apply order of same original sql may cause inconsistent state.
sql += " order by update_time"

// No need to acquire the session context lock for ExecRestrictedSQL, it
// No need to acquire the session context lock for ExecRestrictedStmt, it
// uses another background session.
rows, _, err := h.sctx.Context.(sqlexec.RestrictedSQLExecutor).ExecRestrictedSQL(sql)
rows, _, err := exec.ExecRestrictedStmt(context.Background(), stmt)
if err != nil {
h.bindInfo.Unlock()
return err
Expand Down Expand Up @@ -215,7 +218,7 @@ func (h *BindHandle) CreateBindRecord(sctx sessionctx.Context, record *BindRecor
return err
}
// Binding recreation should physically delete previous bindings.
_, err = exec.ExecuteInternal(context.TODO(), h.deleteBindInfoSQL(record.OriginalSQL, record.Db, ""))
_, err = exec.ExecuteInternal(context.TODO(), `DELETE FROM mysql.bind_info WHERE original_sql = %?`, record.OriginalSQL)
if err != nil {
return err
}
Expand All @@ -227,7 +230,17 @@ func (h *BindHandle) CreateBindRecord(sctx sessionctx.Context, record *BindRecor
record.Bindings[i].UpdateTime = now

// Insert the BindRecord to the storage.
_, err = exec.ExecuteInternal(context.TODO(), h.insertBindInfoSQL(record.OriginalSQL, record.Db, record.Bindings[i]))
_, err = exec.ExecuteInternal(context.TODO(), `INSERT INTO mysql.bind_info VALUES (%?,%?, %?, %?, %?, %?, %?, %?, %?)`,
record.OriginalSQL,
record.Bindings[i].BindSQL,
record.Db,
record.Bindings[i].Status,
record.Bindings[i].CreateTime.String(),
record.Bindings[i].UpdateTime.String(),
record.Bindings[i].Charset,
record.Bindings[i].Collation,
record.Bindings[i].Source,
)
if err != nil {
return err
}
Expand Down Expand Up @@ -289,7 +302,7 @@ func (h *BindHandle) AddBindRecord(sctx sessionctx.Context, record *BindRecord)
return err
}
if duplicateBinding != nil {
_, err = exec.ExecuteInternal(context.TODO(), h.deleteBindInfoSQL(record.OriginalSQL, record.Db, duplicateBinding.BindSQL))
_, err = exec.ExecuteInternal(context.TODO(), `DELETE FROM mysql.bind_info WHERE original_sql = %? AND bind_sql = %?`, record.OriginalSQL, duplicateBinding.BindSQL)
if err != nil {
return err
}
Expand All @@ -305,7 +318,17 @@ func (h *BindHandle) AddBindRecord(sctx sessionctx.Context, record *BindRecord)
record.Bindings[i].UpdateTime = now

// Insert the BindRecord to the storage.
_, err = exec.ExecuteInternal(context.TODO(), h.insertBindInfoSQL(record.OriginalSQL, record.Db, record.Bindings[i]))
_, err = exec.ExecuteInternal(context.TODO(), `INSERT INTO mysql.bind_info VALUES (%?, %?, %?, %?, %?, %?, %?, %?, %?)`,
record.OriginalSQL,
record.Bindings[i].BindSQL,
record.Db,
record.Bindings[i].Status,
record.Bindings[i].CreateTime.String(),
record.Bindings[i].UpdateTime.String(),
record.Bindings[i].Charset,
record.Bindings[i].Collation,
record.Bindings[i].Source,
)
if err != nil {
return err
}
Expand Down Expand Up @@ -349,17 +372,19 @@ func (h *BindHandle) DropBindRecord(originalSQL, db string, binding *Binding) (e

// Lock mysql.bind_info to synchronize with CreateBindRecord / AddBindRecord / DropBindRecord on other tidb instances.
if err = h.lockBindInfoTable(); err != nil {
return
return err
}

updateTs := types.NewTime(types.FromGoTime(time.Now()), mysql.TypeTimestamp, 3)
updateTs := types.NewTime(types.FromGoTime(time.Now()), mysql.TypeTimestamp, 3).String()

bindSQL := ""
if binding != nil {
bindSQL = binding.BindSQL
if binding == nil {
_, err = exec.ExecuteInternal(context.TODO(), `UPDATE mysql.bind_info SET status = %?, update_time = %? WHERE original_sql = %? AND update_time < %?`,
deleted, updateTs, originalSQL, updateTs)
} else {
_, err = exec.ExecuteInternal(context.TODO(), `UPDATE mysql.bind_info SET status = %?, update_time = %? WHERE original_sql = %? AND update_time < %? AND bind_sql = %?`,
deleted, updateTs, originalSQL, updateTs, binding.BindSQL)
}

_, err = exec.ExecuteInternal(context.TODO(), h.logicalDeleteBindInfoSQL(originalSQL, db, updateTs, bindSQL))
deleteRows = int(h.sctx.Context.GetSessionVars().StmtCtx.AffectedRows())
return err
}
Expand Down Expand Up @@ -575,49 +600,13 @@ func (c cache) getBindRecord(hash, normdOrigSQL, db string) *BindRecord {
return nil
}

func (h *BindHandle) deleteBindInfoSQL(normdOrigSQL, db, bindSQL string) string {
sql := fmt.Sprintf(
`DELETE FROM mysql.bind_info WHERE original_sql=%s`,
expression.Quote(normdOrigSQL),
)
if bindSQL == "" {
return sql
}
return sql + fmt.Sprintf(` and bind_sql = %s`, expression.Quote(bindSQL))
}

func (h *BindHandle) insertBindInfoSQL(orignalSQL string, db string, info Binding) string {
return fmt.Sprintf(`INSERT INTO mysql.bind_info VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)`,
expression.Quote(orignalSQL),
expression.Quote(info.BindSQL),
expression.Quote(db),
expression.Quote(info.Status),
expression.Quote(info.CreateTime.String()),
expression.Quote(info.UpdateTime.String()),
expression.Quote(info.Charset),
expression.Quote(info.Collation),
expression.Quote(info.Source),
)
}

// LockBindInfoSQL simulates LOCK TABLE by updating a same row in each pessimistic transaction.
func (h *BindHandle) LockBindInfoSQL() string {
return fmt.Sprintf("UPDATE mysql.bind_info SET source=%s WHERE original_sql=%s",
expression.Quote(Builtin),
expression.Quote(BuiltinPseudoSQL4BindLock))
}

func (h *BindHandle) logicalDeleteBindInfoSQL(originalSQL, db string, updateTs types.Time, bindingSQL string) string {
updateTsStr := updateTs.String()
sql := fmt.Sprintf(`UPDATE mysql.bind_info SET status=%s,update_time=%s WHERE original_sql=%s and update_time<%s`,
expression.Quote(deleted),
expression.Quote(updateTsStr),
expression.Quote(originalSQL),
expression.Quote(updateTsStr))
if bindingSQL == "" {
return sql
sql, err := sqlexec.EscapeSQL("UPDATE mysql.bind_info SET source= %? WHERE original_sql= %?", Builtin, BuiltinPseudoSQL4BindLock)
if err != nil {
return ""
}
return sql + fmt.Sprintf(` and bind_sql = %s`, expression.Quote(bindingSQL))
return sql
}

// CaptureBaselines is used to automatically capture plan baselines.
Expand Down Expand Up @@ -764,9 +753,17 @@ func (h *BindHandle) SaveEvolveTasksToStore() {
}

func getEvolveParameters(ctx sessionctx.Context) (time.Duration, time.Time, time.Time, error) {
sql := fmt.Sprintf("select variable_name, variable_value from mysql.global_variables where variable_name in ('%s', '%s', '%s')",
variable.TiDBEvolvePlanTaskMaxTime, variable.TiDBEvolvePlanTaskStartTime, variable.TiDBEvolvePlanTaskEndTime)
rows, _, err := ctx.(sqlexec.RestrictedSQLExecutor).ExecRestrictedSQL(sql)
stmt, err := ctx.(sqlexec.RestrictedSQLExecutor).ParseWithParams(
context.TODO(),
"SELECT variable_name, variable_value FROM mysql.global_variables WHERE variable_name IN (%?, %?, %?)",
variable.TiDBEvolvePlanTaskMaxTime,
variable.TiDBEvolvePlanTaskStartTime,
variable.TiDBEvolvePlanTaskEndTime,
)
if err != nil {
return 0, time.Time{}, time.Time{}, err
}
rows, _, err := ctx.(sqlexec.RestrictedSQLExecutor).ExecRestrictedStmt(context.TODO(), stmt)
if err != nil {
return 0, time.Time{}, time.Time{}, err
}
Expand Down Expand Up @@ -837,7 +834,7 @@ func (h *BindHandle) getOnePendingVerifyJob() (string, string, Binding) {
func (h *BindHandle) getRunningDuration(sctx sessionctx.Context, db, sql string, maxTime time.Duration) (time.Duration, error) {
ctx := context.TODO()
if db != "" {
_, err := sctx.(sqlexec.SQLExecutor).ExecuteInternal(ctx, fmt.Sprintf("use `%s`", db))
_, err := sctx.(sqlexec.SQLExecutor).ExecuteInternal(ctx, "use %n", db)
if err != nil {
return 0, err
}
Expand Down

0 comments on commit 3d4bb86

Please sign in to comment.