Skip to content

Commit

Permalink
Merge branch 'release-4.0' into release-4.0-eea52f5f2091
Browse files Browse the repository at this point in the history
  • Loading branch information
wjhuang2016 committed May 11, 2021
2 parents c812b77 + db79876 commit 31b13ee
Show file tree
Hide file tree
Showing 202 changed files with 6,326 additions and 1,808 deletions.
4 changes: 4 additions & 0 deletions .github/labeler.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
component/executor:
- distsql/*
- executor/*
- !executor/brie*
- util/chunk/*
- util/disk/*
- util/execdetails/*
Expand Down Expand Up @@ -31,3 +32,6 @@ component/DDL:

component/config:
- config/*

sig/migrate:
- executor/brie*
36 changes: 36 additions & 0 deletions bindinfo/bind_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1541,6 +1541,28 @@ func (s *testSuite) TestCapturePlanBaselineIgnoreTiFlash(c *C) {
c.Assert(rows[0][1], Equals, "SELECT /*+ use_index(@`sel_1` `test`.`t` )*/ * FROM `test`.`t`")
}

func (s *testSuite) TestSPMHitInfo(c *C) {
tk := testkit.NewTestKit(c, s.store)
s.cleanBindingEnv(tk)
tk.MustExec("use test")
tk.MustExec("drop table if exists t1")
tk.MustExec("drop table if exists t2")
tk.MustExec("create table t1(id int)")
tk.MustExec("create table t2(id int)")

c.Assert(tk.HasPlan("SELECT * from t1,t2 where t1.id = t2.id", "HashJoin"), IsTrue)
c.Assert(tk.HasPlan("SELECT /*+ TIDB_SMJ(t1, t2) */ * from t1,t2 where t1.id = t2.id", "MergeJoin"), IsTrue)

tk.MustExec("SELECT * from t1,t2 where t1.id = t2.id")
tk.MustQuery(`select @@last_plan_from_binding;`).Check(testkit.Rows("0"))
tk.MustExec("create global binding for SELECT * from t1,t2 where t1.id = t2.id using SELECT /*+ TIDB_SMJ(t1, t2) */ * from t1,t2 where t1.id = t2.id")

c.Assert(tk.HasPlan("SELECT * from t1,t2 where t1.id = t2.id", "MergeJoin"), IsTrue)
tk.MustExec("SELECT * from t1,t2 where t1.id = t2.id")
tk.MustQuery(`select @@last_plan_from_binding;`).Check(testkit.Rows("1"))
tk.MustExec("drop global binding for SELECT * from t1,t2 where t1.id = t2.id")
}

func (s *testSuite) TestNotEvolvePlanForReadStorageHint(c *C) {
tk := testkit.NewTestKit(c, s.store)
s.cleanBindingEnv(tk)
Expand Down Expand Up @@ -1843,3 +1865,17 @@ func (s *testSuite) TestCaptureWithZeroSlowLogThreshold(c *C) {
c.Assert(len(rows), Equals, 1)
c.Assert(rows[0][0], Equals, "select * from test . t")
}

func (s *testSuite) TestSPMWithoutUseDatabase(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk1 := testkit.NewTestKit(c, s.store)
s.cleanBindingEnv(tk)
s.cleanBindingEnv(tk1)
tk.MustExec("use test")
tk.MustExec("drop table if exists t")
tk.MustExec("create table t(a int, b int, key(a))")
tk.MustExec("create global binding for select * from t using select * from t force index(a)")

err := tk1.ExecToErr("select * from t")
c.Assert(err, ErrorMatches, "*No database selected")
}
144 changes: 68 additions & 76 deletions bindinfo/handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (
"github.com/pingcap/parser/format"
"github.com/pingcap/parser/mysql"
"github.com/pingcap/parser/terror"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/metrics"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/variable"
Expand Down Expand Up @@ -123,17 +122,21 @@ func NewBindHandle(ctx sessionctx.Context) *BindHandle {
func (h *BindHandle) Update(fullLoad bool) (err error) {
h.bindInfo.Lock()
lastUpdateTime := h.bindInfo.lastUpdateTime
updateTime := lastUpdateTime.String()
if fullLoad {
updateTime = "0000-00-00 00:00:00"
}

sql := "select original_sql, bind_sql, default_db, status, create_time, update_time, charset, collation, source from mysql.bind_info"
if !fullLoad {
sql += " where update_time > \"" + lastUpdateTime.String() + "\""
exec := h.sctx.Context.(sqlexec.RestrictedSQLExecutor)
stmt, err := exec.ParseWithParams(context.TODO(), `SELECT original_sql, bind_sql, default_db, status, create_time, update_time, charset, collation, source
FROM mysql.bind_info WHERE update_time > %? ORDER BY update_time`, updateTime)
if err != nil {
return err
}
// We need to apply the updates by order, wrong apply order of same original sql may cause inconsistent state.
sql += " order by update_time"

// No need to acquire the session context lock for ExecRestrictedSQL, it
// No need to acquire the session context lock for ExecRestrictedStmt, it
// uses another background session.
rows, _, err := h.sctx.Context.(sqlexec.RestrictedSQLExecutor).ExecRestrictedSQL(sql)
rows, _, err := exec.ExecRestrictedStmt(context.Background(), stmt)
if err != nil {
h.bindInfo.Unlock()
return err
Expand Down Expand Up @@ -215,7 +218,7 @@ func (h *BindHandle) CreateBindRecord(sctx sessionctx.Context, record *BindRecor
return err
}
// Binding recreation should physically delete previous bindings.
_, err = exec.ExecuteInternal(context.TODO(), h.deleteBindInfoSQL(record.OriginalSQL, record.Db, ""))
_, err = exec.ExecuteInternal(context.TODO(), `DELETE FROM mysql.bind_info WHERE original_sql = %?`, record.OriginalSQL)
if err != nil {
return err
}
Expand All @@ -227,7 +230,17 @@ func (h *BindHandle) CreateBindRecord(sctx sessionctx.Context, record *BindRecor
record.Bindings[i].UpdateTime = now

// Insert the BindRecord to the storage.
_, err = exec.ExecuteInternal(context.TODO(), h.insertBindInfoSQL(record.OriginalSQL, record.Db, record.Bindings[i]))
_, err = exec.ExecuteInternal(context.TODO(), `INSERT INTO mysql.bind_info VALUES (%?,%?, %?, %?, %?, %?, %?, %?, %?)`,
record.OriginalSQL,
record.Bindings[i].BindSQL,
record.Db,
record.Bindings[i].Status,
record.Bindings[i].CreateTime.String(),
record.Bindings[i].UpdateTime.String(),
record.Bindings[i].Charset,
record.Bindings[i].Collation,
record.Bindings[i].Source,
)
if err != nil {
return err
}
Expand Down Expand Up @@ -289,7 +302,7 @@ func (h *BindHandle) AddBindRecord(sctx sessionctx.Context, record *BindRecord)
return err
}
if duplicateBinding != nil {
_, err = exec.ExecuteInternal(context.TODO(), h.deleteBindInfoSQL(record.OriginalSQL, record.Db, duplicateBinding.BindSQL))
_, err = exec.ExecuteInternal(context.TODO(), `DELETE FROM mysql.bind_info WHERE original_sql = %? AND bind_sql = %?`, record.OriginalSQL, duplicateBinding.BindSQL)
if err != nil {
return err
}
Expand All @@ -305,7 +318,17 @@ func (h *BindHandle) AddBindRecord(sctx sessionctx.Context, record *BindRecord)
record.Bindings[i].UpdateTime = now

// Insert the BindRecord to the storage.
_, err = exec.ExecuteInternal(context.TODO(), h.insertBindInfoSQL(record.OriginalSQL, record.Db, record.Bindings[i]))
_, err = exec.ExecuteInternal(context.TODO(), `INSERT INTO mysql.bind_info VALUES (%?, %?, %?, %?, %?, %?, %?, %?, %?)`,
record.OriginalSQL,
record.Bindings[i].BindSQL,
record.Db,
record.Bindings[i].Status,
record.Bindings[i].CreateTime.String(),
record.Bindings[i].UpdateTime.String(),
record.Bindings[i].Charset,
record.Bindings[i].Collation,
record.Bindings[i].Source,
)
if err != nil {
return err
}
Expand Down Expand Up @@ -349,17 +372,19 @@ func (h *BindHandle) DropBindRecord(originalSQL, db string, binding *Binding) (e

// Lock mysql.bind_info to synchronize with CreateBindRecord / AddBindRecord / DropBindRecord on other tidb instances.
if err = h.lockBindInfoTable(); err != nil {
return
return err
}

updateTs := types.NewTime(types.FromGoTime(time.Now()), mysql.TypeTimestamp, 3)
updateTs := types.NewTime(types.FromGoTime(time.Now()), mysql.TypeTimestamp, 3).String()

bindSQL := ""
if binding != nil {
bindSQL = binding.BindSQL
if binding == nil {
_, err = exec.ExecuteInternal(context.TODO(), `UPDATE mysql.bind_info SET status = %?, update_time = %? WHERE original_sql = %? AND update_time < %?`,
deleted, updateTs, originalSQL, updateTs)
} else {
_, err = exec.ExecuteInternal(context.TODO(), `UPDATE mysql.bind_info SET status = %?, update_time = %? WHERE original_sql = %? AND update_time < %? AND bind_sql = %?`,
deleted, updateTs, originalSQL, updateTs, binding.BindSQL)
}

_, err = exec.ExecuteInternal(context.TODO(), h.logicalDeleteBindInfoSQL(originalSQL, db, updateTs, bindSQL))
deleteRows = int(h.sctx.Context.GetSessionVars().StmtCtx.AffectedRows())
return err
}
Expand Down Expand Up @@ -575,49 +600,13 @@ func (c cache) getBindRecord(hash, normdOrigSQL, db string) *BindRecord {
return nil
}

func (h *BindHandle) deleteBindInfoSQL(normdOrigSQL, db, bindSQL string) string {
sql := fmt.Sprintf(
`DELETE FROM mysql.bind_info WHERE original_sql=%s`,
expression.Quote(normdOrigSQL),
)
if bindSQL == "" {
return sql
}
return sql + fmt.Sprintf(` and bind_sql = %s`, expression.Quote(bindSQL))
}

func (h *BindHandle) insertBindInfoSQL(orignalSQL string, db string, info Binding) string {
return fmt.Sprintf(`INSERT INTO mysql.bind_info VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)`,
expression.Quote(orignalSQL),
expression.Quote(info.BindSQL),
expression.Quote(db),
expression.Quote(info.Status),
expression.Quote(info.CreateTime.String()),
expression.Quote(info.UpdateTime.String()),
expression.Quote(info.Charset),
expression.Quote(info.Collation),
expression.Quote(info.Source),
)
}

// LockBindInfoSQL simulates LOCK TABLE by updating a same row in each pessimistic transaction.
func (h *BindHandle) LockBindInfoSQL() string {
return fmt.Sprintf("UPDATE mysql.bind_info SET source=%s WHERE original_sql=%s",
expression.Quote(Builtin),
expression.Quote(BuiltinPseudoSQL4BindLock))
}

func (h *BindHandle) logicalDeleteBindInfoSQL(originalSQL, db string, updateTs types.Time, bindingSQL string) string {
updateTsStr := updateTs.String()
sql := fmt.Sprintf(`UPDATE mysql.bind_info SET status=%s,update_time=%s WHERE original_sql=%s and update_time<%s`,
expression.Quote(deleted),
expression.Quote(updateTsStr),
expression.Quote(originalSQL),
expression.Quote(updateTsStr))
if bindingSQL == "" {
return sql
sql, err := sqlexec.EscapeSQL("UPDATE mysql.bind_info SET source= %? WHERE original_sql= %?", Builtin, BuiltinPseudoSQL4BindLock)
if err != nil {
return ""
}
return sql + fmt.Sprintf(` and bind_sql = %s`, expression.Quote(bindingSQL))
return sql
}

// CaptureBaselines is used to automatically capture plan baselines.
Expand Down Expand Up @@ -661,16 +650,14 @@ func (h *BindHandle) CaptureBaselines() {
func getHintsForSQL(sctx sessionctx.Context, sql string) (string, error) {
oriVals := sctx.GetSessionVars().UsePlanBaselines
sctx.GetSessionVars().UsePlanBaselines = false
recordSets, err := sctx.(sqlexec.SQLExecutor).ExecuteInternal(context.TODO(), fmt.Sprintf("explain format='hint' %s", sql))
rs, err := sctx.(sqlexec.SQLExecutor).ExecuteInternal(context.TODO(), fmt.Sprintf("explain format='hint' %s", sql))
sctx.GetSessionVars().UsePlanBaselines = oriVals
if len(recordSets) > 0 {
defer terror.Log(recordSets[0].Close())
}
if err != nil {
return "", err
}
chk := recordSets[0].NewChunk()
err = recordSets[0].Next(context.TODO(), chk)
defer terror.Call(rs.Close)
chk := rs.NewChunk()
err = rs.Next(context.TODO(), chk)
if err != nil {
return "", err
}
Expand Down Expand Up @@ -766,9 +753,17 @@ func (h *BindHandle) SaveEvolveTasksToStore() {
}

func getEvolveParameters(ctx sessionctx.Context) (time.Duration, time.Time, time.Time, error) {
sql := fmt.Sprintf("select variable_name, variable_value from mysql.global_variables where variable_name in ('%s', '%s', '%s')",
variable.TiDBEvolvePlanTaskMaxTime, variable.TiDBEvolvePlanTaskStartTime, variable.TiDBEvolvePlanTaskEndTime)
rows, _, err := ctx.(sqlexec.RestrictedSQLExecutor).ExecRestrictedSQL(sql)
stmt, err := ctx.(sqlexec.RestrictedSQLExecutor).ParseWithParams(
context.TODO(),
"SELECT variable_name, variable_value FROM mysql.global_variables WHERE variable_name IN (%?, %?, %?)",
variable.TiDBEvolvePlanTaskMaxTime,
variable.TiDBEvolvePlanTaskStartTime,
variable.TiDBEvolvePlanTaskEndTime,
)
if err != nil {
return 0, time.Time{}, time.Time{}, err
}
rows, _, err := ctx.(sqlexec.RestrictedSQLExecutor).ExecRestrictedStmt(context.TODO(), stmt)
if err != nil {
return 0, time.Time{}, time.Time{}, err
}
Expand Down Expand Up @@ -839,7 +834,7 @@ func (h *BindHandle) getOnePendingVerifyJob() (string, string, Binding) {
func (h *BindHandle) getRunningDuration(sctx sessionctx.Context, db, sql string, maxTime time.Duration) (time.Duration, error) {
ctx := context.TODO()
if db != "" {
_, err := sctx.(sqlexec.SQLExecutor).ExecuteInternal(ctx, fmt.Sprintf("use `%s`", db))
_, err := sctx.(sqlexec.SQLExecutor).ExecuteInternal(ctx, "use %n", db)
if err != nil {
return 0, err
}
Expand Down Expand Up @@ -873,23 +868,20 @@ func runSQL(ctx context.Context, sctx sessionctx.Context, sql string, resultChan
resultChan <- fmt.Errorf("run sql panicked: %v", string(buf))
}
}()
recordSets, err := sctx.(sqlexec.SQLExecutor).ExecuteInternal(ctx, sql)
rs, err := sctx.(sqlexec.SQLExecutor).ExecuteInternal(ctx, sql)
if err != nil {
if len(recordSets) > 0 {
terror.Call(recordSets[0].Close)
}
terror.Call(rs.Close)
resultChan <- err
return
}
recordSet := recordSets[0]
chk := recordSets[0].NewChunk()
chk := rs.NewChunk()
for {
err = recordSet.Next(ctx, chk)
err = rs.Next(ctx, chk)
if err != nil || chk.NumRows() == 0 {
break
}
}
terror.Call(recordSets[0].Close)
terror.Call(rs.Close)
resultChan <- err
}

Expand Down
Loading

0 comments on commit 31b13ee

Please sign in to comment.