Skip to content

Commit

Permalink
Merge branch 'master' into forbidBaselineEvolution
Browse files Browse the repository at this point in the history
  • Loading branch information
qw4990 authored Aug 4, 2021
2 parents 98ba826 + c3f92c2 commit 0a8ef1e
Show file tree
Hide file tree
Showing 13 changed files with 409 additions and 117 deletions.
37 changes: 37 additions & 0 deletions bindinfo/bind_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2240,3 +2240,40 @@ func (s *testSuite) TestBindingLastUpdateTime(c *C) {
c.Assert(updateTime2, Equals, updateTime)
tk.MustQuery(`show global status like 'last_plan_binding_update_time';`).Check(testkit.Rows())
}

func (s *testSuite) TestGCBindRecord(c *C) {
tk := testkit.NewTestKit(c, s.store)
s.cleanBindingEnv(tk)
tk.MustExec("use test")
tk.MustExec("drop table if exists t")
tk.MustExec("create table t(a int, b int, key(a))")

tk.MustExec("create global binding for select * from t where a = 1 using select * from t use index(a) where a = 1")
rows := tk.MustQuery("show global bindings").Rows()
c.Assert(len(rows), Equals, 1)
c.Assert(rows[0][0], Equals, "select * from `test` . `t` where `a` = ?")
c.Assert(rows[0][3], Equals, "using")
tk.MustQuery("select status from mysql.bind_info where original_sql = 'select * from `test` . `t` where `a` = ?'").Check(testkit.Rows(
"using",
))

h := s.domain.BindHandle()
// bindinfo.Lease is set to 0 for test env in SetUpSuite.
c.Assert(h.GCBindRecord(), IsNil)
rows = tk.MustQuery("show global bindings").Rows()
c.Assert(len(rows), Equals, 1)
c.Assert(rows[0][0], Equals, "select * from `test` . `t` where `a` = ?")
c.Assert(rows[0][3], Equals, "using")
tk.MustQuery("select status from mysql.bind_info where original_sql = 'select * from `test` . `t` where `a` = ?'").Check(testkit.Rows(
"using",
))

tk.MustExec("drop global binding for select * from t where a = 1")
tk.MustQuery("show global bindings").Check(testkit.Rows())
tk.MustQuery("select status from mysql.bind_info where original_sql = 'select * from `test` . `t` where `a` = ?'").Check(testkit.Rows(
"deleted",
))
c.Assert(h.GCBindRecord(), IsNil)
tk.MustQuery("show global bindings").Check(testkit.Rows())
tk.MustQuery("select status from mysql.bind_info where original_sql = 'select * from `test` . `t` where `a` = ?'").Check(testkit.Rows())
}
39 changes: 39 additions & 0 deletions bindinfo/handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -390,6 +390,45 @@ func (h *BindHandle) DropBindRecord(originalSQL, db string, binding *Binding) (e
return err
}

// GCBindRecord physically removes the deleted bind records in mysql.bind_info.
func (h *BindHandle) GCBindRecord() (err error) {
h.bindInfo.Lock()
h.sctx.Lock()
defer func() {
h.sctx.Unlock()
h.bindInfo.Unlock()
}()
exec, _ := h.sctx.Context.(sqlexec.SQLExecutor)
_, err = exec.ExecuteInternal(context.TODO(), "BEGIN PESSIMISTIC")
if err != nil {
return err
}
defer func() {
if err != nil {
_, err1 := exec.ExecuteInternal(context.TODO(), "ROLLBACK")
terror.Log(err1)
return
}

_, err = exec.ExecuteInternal(context.TODO(), "COMMIT")
if err != nil {
return
}
}()

// Lock mysql.bind_info to synchronize with CreateBindRecord / AddBindRecord / DropBindRecord on other tidb instances.
if err = h.lockBindInfoTable(); err != nil {
return err
}

// To make sure that all the deleted bind records have been acknowledged to all tidb,
// we only garbage collect those records with update_time before 10 leases.
updateTime := time.Now().Add(-(10 * Lease))
updateTimeStr := types.NewTime(types.FromGoTime(updateTime), mysql.TypeTimestamp, 3).String()
_, err = exec.ExecuteInternal(context.TODO(), `DELETE FROM mysql.bind_info WHERE status = 'deleted' and update_time < %?`, updateTimeStr)
return err
}

// lockBindInfoTable simulates `LOCK TABLE mysql.bind_info WRITE` by acquiring a pessimistic lock on a
// special builtin row of mysql.bind_info. Note that this function must be called with h.sctx.Lock() held.
// We can replace this implementation to normal `LOCK TABLE mysql.bind_info WRITE` if that feature is
Expand Down
25 changes: 19 additions & 6 deletions domain/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -990,12 +990,13 @@ func (do *Domain) LoadBindInfoLoop(ctxForHandle sessionctx.Context, ctxForEvolve
return err
}

do.globalBindHandleWorkerLoop()
do.handleEvolvePlanTasksLoop(ctxForEvolve)
owner := do.newOwnerManager(bindinfo.Prompt, bindinfo.OwnerKey)
do.globalBindHandleWorkerLoop(owner)
do.handleEvolvePlanTasksLoop(ctxForEvolve, owner)
return nil
}

func (do *Domain) globalBindHandleWorkerLoop() {
func (do *Domain) globalBindHandleWorkerLoop(owner owner.Manager) {
do.wg.Add(1)
go func() {
defer func() {
Expand All @@ -1004,10 +1005,15 @@ func (do *Domain) globalBindHandleWorkerLoop() {
util.Recover(metrics.LabelDomain, "globalBindHandleWorkerLoop", nil, false)
}()
bindWorkerTicker := time.NewTicker(bindinfo.Lease)
defer bindWorkerTicker.Stop()
gcBindTicker := time.NewTicker(100 * bindinfo.Lease)
defer func() {
bindWorkerTicker.Stop()
gcBindTicker.Stop()
}()
for {
select {
case <-do.exit:
owner.Cancel()
return
case <-bindWorkerTicker.C:
err := do.bindHandle.Update(false)
Expand All @@ -1019,20 +1025,27 @@ func (do *Domain) globalBindHandleWorkerLoop() {
do.bindHandle.CaptureBaselines()
}
do.bindHandle.SaveEvolveTasksToStore()
case <-gcBindTicker.C:
if !owner.IsOwner() {
continue
}
err := do.bindHandle.GCBindRecord()
if err != nil {
logutil.BgLogger().Error("GC bind record failed", zap.Error(err))
}
}
}
}()
}

func (do *Domain) handleEvolvePlanTasksLoop(ctx sessionctx.Context) {
func (do *Domain) handleEvolvePlanTasksLoop(ctx sessionctx.Context, owner owner.Manager) {
do.wg.Add(1)
go func() {
defer func() {
do.wg.Done()
logutil.BgLogger().Info("handleEvolvePlanTasksLoop exited.")
util.Recover(metrics.LabelDomain, "handleEvolvePlanTasksLoop", nil, false)
}()
owner := do.newOwnerManager(bindinfo.Prompt, bindinfo.OwnerKey)
for {
select {
case <-do.exit:
Expand Down
14 changes: 11 additions & 3 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -1591,9 +1591,7 @@ func (b *executorBuilder) buildMemTable(v *plannercore.PhysicalMemTable) Executo
strings.ToLower(infoschema.TablePlacementPolicy),
strings.ToLower(infoschema.TableClientErrorsSummaryGlobal),
strings.ToLower(infoschema.TableClientErrorsSummaryByUser),
strings.ToLower(infoschema.TableClientErrorsSummaryByHost),
strings.ToLower(infoschema.TableDeadlocks),
strings.ToLower(infoschema.ClusterTableDeadlocks):
strings.ToLower(infoschema.TableClientErrorsSummaryByHost):
return &MemTableReaderExec{
baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ID()),
table: v.Table,
Expand Down Expand Up @@ -1621,6 +1619,16 @@ func (b *executorBuilder) buildMemTable(v *plannercore.PhysicalMemTable) Executo
columns: v.Columns,
},
}
case strings.ToLower(infoschema.TableDeadlocks),
strings.ToLower(infoschema.ClusterTableDeadlocks):
return &MemTableReaderExec{
baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ID()),
table: v.Table,
retriever: &deadlocksTableRetriever{
table: v.Table,
columns: v.Columns,
},
}
case strings.ToLower(infoschema.TableStatementsSummary),
strings.ToLower(infoschema.TableStatementsSummaryHistory),
strings.ToLower(infoschema.ClusterTableStatementsSummaryHistory),
Expand Down
17 changes: 11 additions & 6 deletions executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8252,7 +8252,7 @@ func (s *testSerialSuite) TestIssue24210(c *C) {
c.Assert(err, IsNil)
}

func (s *testSerialSuite) TestDeadlockTable(c *C) {
func (s *testSerialSuite) TestDeadlocksTable(c *C) {
deadlockhistory.GlobalDeadlockHistory.Clear()
deadlockhistory.GlobalDeadlockHistory.Resize(10)

Expand Down Expand Up @@ -8307,14 +8307,19 @@ func (s *testSerialSuite) TestDeadlockTable(c *C) {
id1 := strconv.FormatUint(rec.ID, 10)
id2 := strconv.FormatUint(rec2.ID, 10)

c.Assert(failpoint.Enable("github.com/pingcap/tidb/executor/sqlDigestRetrieverSkipRetrieveGlobal", "return"), IsNil)
defer func() {
c.Assert(failpoint.Disable("github.com/pingcap/tidb/executor/sqlDigestRetrieverSkipRetrieveGlobal"), IsNil)
}()

tk := testkit.NewTestKit(c, s.store)
tk.MustQuery("select * from information_schema.deadlocks").Check(
testutil.RowsWithSep("/",
id1+"/2021-05-10 01:02:03.456789/0/101/aabbccdd/6B31/<nil>/102",
id1+"/2021-05-10 01:02:03.456789/0/102/ddccbbaa/6B32/<nil>/101",
id2+"/2022-06-11 02:03:04.987654/1/201/<nil>/<nil>/<nil>/202",
id2+"/2022-06-11 02:03:04.987654/1/202/<nil>/<nil>/<nil>/203",
id2+"/2022-06-11 02:03:04.987654/1/203/<nil>/<nil>/<nil>/201",
id1+"/2021-05-10 01:02:03.456789/0/101/aabbccdd/<nil>/6B31/<nil>/102",
id1+"/2021-05-10 01:02:03.456789/0/102/ddccbbaa/<nil>/6B32/<nil>/101",
id2+"/2022-06-11 02:03:04.987654/1/201/<nil>/<nil>/<nil>/<nil>/202",
id2+"/2022-06-11 02:03:04.987654/1/202/<nil>/<nil>/<nil>/<nil>/203",
id2+"/2022-06-11 02:03:04.987654/1/203/<nil>/<nil>/<nil>/<nil>/201",
))
}

Expand Down
Loading

0 comments on commit 0a8ef1e

Please sign in to comment.