From 28a2a07fe9d2cc1bfa214e7938eb6b0d98985fc0 Mon Sep 17 00:00:00 2001 From: Kenan Yao Date: Wed, 9 Dec 2020 02:52:24 +0800 Subject: [PATCH] bindinfo: sync concurrent ops on mysql.bind_info from multiple tidb instances --- bindinfo/bind_test.go | 4 +- bindinfo/cache.go | 2 + bindinfo/handle.go | 161 ++++++++++++++++++++++++------------------ session/bootstrap.go | 42 ++++++++--- session/session.go | 2 +- 5 files changed, 128 insertions(+), 83 deletions(-) diff --git a/bindinfo/bind_test.go b/bindinfo/bind_test.go index e85ac5fba683d..c02ba6eb51964 100644 --- a/bindinfo/bind_test.go +++ b/bindinfo/bind_test.go @@ -139,7 +139,7 @@ func (s *testSuite) TearDownTest(c *C) { } func (s *testSuite) cleanBindingEnv(tk *testkit.TestKit) { - tk.MustExec("truncate table mysql.bind_info") + tk.MustExec("delete from mysql.bind_info where source != 'builtin'") s.domain.BindHandle().Clear() } @@ -1304,7 +1304,7 @@ func (s *testSuite) TestEvolveInvalidBindings(c *C) { // Manufacture a rejected binding by hacking mysql.bind_info. tk.MustExec("insert into mysql.bind_info values('select * from t where a > ?', 'select /*+ USE_INDEX(t,idx_a) */ * from t where a > 10', 'test', 'rejected', '2000-01-01 09:00:00', '2000-01-01 09:00:00', '', '','" + bindinfo.Manual + "')") - tk.MustQuery("select bind_sql, status from mysql.bind_info").Sort().Check(testkit.Rows( + tk.MustQuery("select bind_sql, status from mysql.bind_info where source != 'builtin'").Sort().Check(testkit.Rows( "select /*+ USE_INDEX(t) */ * from t where a > 10 using", "select /*+ USE_INDEX(t,idx_a) */ * from t where a > 10 rejected", )) diff --git a/bindinfo/cache.go b/bindinfo/cache.go index a7f216d7ca951..d0857bcd10288 100644 --- a/bindinfo/cache.go +++ b/bindinfo/cache.go @@ -42,6 +42,8 @@ const ( Capture = "capture" // Evolve indicates the binding is evolved by TiDB from old bindings. Evolve = "evolve" + // Builtin indicates the binding is a builtin record for internal locking purpose. It is also the status for the builtin binding. + Builtin = "builtin" ) // Binding stores the basic bind hint info. diff --git a/bindinfo/handle.go b/bindinfo/handle.go index fb5f529e92adc..7d8da8f28f4df 100644 --- a/bindinfo/handle.go +++ b/bindinfo/handle.go @@ -33,7 +33,6 @@ import ( "github.com/pingcap/tidb/metrics" "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/sessionctx/variable" - "github.com/pingcap/tidb/store/tikv/oracle" "github.com/pingcap/tidb/types" driver "github.com/pingcap/tidb/types/parser_driver" "github.com/pingcap/tidb/util/chunk" @@ -94,6 +93,8 @@ const ( OwnerKey = "/tidb/bindinfo/owner" // Prompt is the prompt for bindinfo owner manager. Prompt = "bindinfo" + // BuiltinPseudoSQL4BindLock is used to simulate LOCK TABLE for mysql.bind_info. + BuiltinPseudoSQL4BindLock = "builtin_pseudo_sql_for_bind_lock" ) type bindRecordUpdate struct { @@ -123,7 +124,6 @@ func NewBindHandle(ctx sessionctx.Context) *BindHandle { func (h *BindHandle) Update(fullLoad bool) (err error) { h.bindInfo.Lock() lastUpdateTime := h.bindInfo.lastUpdateTime - h.bindInfo.Unlock() sql := "select original_sql, bind_sql, default_db, status, create_time, update_time, charset, collation, source from mysql.bind_info" if !fullLoad { @@ -136,11 +136,10 @@ func (h *BindHandle) Update(fullLoad bool) (err error) { // uses another background session. rows, _, err := h.sctx.Context.(sqlexec.RestrictedSQLExecutor).ExecRestrictedSQL(sql) if err != nil { + h.bindInfo.Unlock() return err } - // Make sure there is only one goroutine writes the cache. - h.bindInfo.Lock() newCache := h.bindInfo.Value.Load().(cache).copy() defer func() { h.bindInfo.lastUpdateTime = lastUpdateTime @@ -149,6 +148,10 @@ func (h *BindHandle) Update(fullLoad bool) (err error) { }() for _, row := range rows { + // Skip the builtin record which is designed for binding synchronization. + if row.GetString(0) == BuiltinPseudoSQL4BindLock { + continue + } hash, meta, err := h.newBindRecord(row) // Update lastUpdateTime to the newest one. if meta.Bindings[0].UpdateTime.Compare(lastUpdateTime) > 0 { @@ -179,65 +182,51 @@ func (h *BindHandle) CreateBindRecord(sctx sessionctx.Context, record *BindRecor return err } - exec, _ := h.sctx.Context.(sqlexec.SQLExecutor) + h.bindInfo.Lock() h.sctx.Lock() - _, err = exec.ExecuteInternal(context.TODO(), "BEGIN") - if err != nil { + defer func() { h.sctx.Unlock() + h.bindInfo.Unlock() + }() + exec, _ := h.sctx.Context.(sqlexec.SQLExecutor) + _, err = exec.ExecuteInternal(context.TODO(), "BEGIN PESSIMISTIC") + if err != nil { return } - normalizedSQL := parser.DigestNormalized(record.OriginalSQL) - oldRecord := h.GetBindRecord(normalizedSQL, record.OriginalSQL, record.Db) - defer func() { if err != nil { _, err1 := exec.ExecuteInternal(context.TODO(), "ROLLBACK") - h.sctx.Unlock() terror.Log(err1) return } _, err = exec.ExecuteInternal(context.TODO(), "COMMIT") - h.sctx.Unlock() if err != nil { return } - // Make sure there is only one goroutine writes the cache and uses parser. - h.bindInfo.Lock() - if oldRecord != nil { - h.removeBindRecord(normalizedSQL, oldRecord) - } - h.appendBindRecord(normalizedSQL, record) - h.bindInfo.Unlock() + sqlDigest := parser.DigestNormalized(record.OriginalSQL) + h.setBindRecord(sqlDigest, record) }() - var txn kv.Transaction - txn, err = h.sctx.Context.Txn(true) + // Lock mysql.bind_info to synchronize with CreateBindRecord / AddBindRecord / DropBindRecord on other tidb instances. + if err = h.lockBindInfoTable(); err != nil { + return err + } + // Binding recreation should physically delete previous bindings. + _, err = exec.ExecuteInternal(context.TODO(), h.deleteBindInfoSQL(record.OriginalSQL, record.Db, "")) if err != nil { return err } - now := types.NewTime(types.FromGoTime(oracle.GetTimeFromTS(txn.StartTS())), mysql.TypeTimestamp, 3) - if oldRecord != nil { - for _, binding := range oldRecord.Bindings { - // Binding recreation should physically delete previous bindings, since marking them as deleted may - // cause unexpected binding caches if there are concurrent CREATE BINDING on multiple tidb instances, - // because the record with `using` status is not guaranteed to have larger update_time than those records - // with `deleted` status. - _, err = exec.ExecuteInternal(context.TODO(), h.deleteBindInfoSQL(record.OriginalSQL, record.Db, binding.BindSQL)) - if err != nil { - return err - } - } - } + now := types.NewTime(types.FromGoTime(time.Now()), mysql.TypeTimestamp, 3) for i := range record.Bindings { record.Bindings[i].CreateTime = now record.Bindings[i].UpdateTime = now - // insert the BindRecord to the storage. + // Insert the BindRecord to the storage. _, err = exec.ExecuteInternal(context.TODO(), h.insertBindInfoSQL(record.OriginalSQL, record.Db, record.Bindings[i])) if err != nil { return err @@ -267,40 +256,37 @@ func (h *BindHandle) AddBindRecord(sctx sessionctx.Context, record *BindRecord) } } - exec, _ := h.sctx.Context.(sqlexec.SQLExecutor) + h.bindInfo.Lock() h.sctx.Lock() - _, err = exec.ExecuteInternal(context.TODO(), "BEGIN") - if err != nil { + defer func() { h.sctx.Unlock() + h.bindInfo.Unlock() + }() + exec, _ := h.sctx.Context.(sqlexec.SQLExecutor) + _, err = exec.ExecuteInternal(context.TODO(), "BEGIN PESSIMISTIC") + if err != nil { return } defer func() { if err != nil { _, err1 := exec.ExecuteInternal(context.TODO(), "ROLLBACK") - h.sctx.Unlock() terror.Log(err1) return } _, err = exec.ExecuteInternal(context.TODO(), "COMMIT") - h.sctx.Unlock() if err != nil { return } - // Make sure there is only one goroutine writes the cache and uses parser. - h.bindInfo.Lock() h.appendBindRecord(parser.DigestNormalized(record.OriginalSQL), record) - h.bindInfo.Unlock() }() - var txn kv.Transaction - txn, err = h.sctx.Context.Txn(true) - if err != nil { + // Lock mysql.bind_info to synchronize with CreateBindRecord / AddBindRecord / DropBindRecord on other tidb instances. + if err = h.lockBindInfoTable(); err != nil { return err } - if duplicateBinding != nil { _, err = exec.ExecuteInternal(context.TODO(), h.deleteBindInfoSQL(record.OriginalSQL, record.Db, duplicateBinding.BindSQL)) if err != nil { @@ -308,7 +294,7 @@ func (h *BindHandle) AddBindRecord(sctx sessionctx.Context, record *BindRecord) } } - now := types.NewTime(types.FromGoTime(oracle.GetTimeFromTS(txn.StartTS())), mysql.TypeTimestamp, 3) + now := types.NewTime(types.FromGoTime(time.Now()), mysql.TypeTimestamp, 3) for i := range record.Bindings { if duplicateBinding != nil { record.Bindings[i].CreateTime = duplicateBinding.CreateTime @@ -317,7 +303,7 @@ func (h *BindHandle) AddBindRecord(sctx sessionctx.Context, record *BindRecord) } record.Bindings[i].UpdateTime = now - // insert the BindRecord to the storage. + // Insert the BindRecord to the storage. _, err = exec.ExecuteInternal(context.TODO(), h.insertBindInfoSQL(record.OriginalSQL, record.Db, record.Bindings[i])) if err != nil { return err @@ -328,25 +314,27 @@ func (h *BindHandle) AddBindRecord(sctx sessionctx.Context, record *BindRecord) // DropBindRecord drops a BindRecord to the storage and BindRecord int the cache. func (h *BindHandle) DropBindRecord(originalSQL, db string, binding *Binding) (err error) { - exec, _ := h.sctx.Context.(sqlexec.SQLExecutor) + h.bindInfo.Lock() h.sctx.Lock() - _, err = exec.ExecuteInternal(context.TODO(), "BEGIN") - if err != nil { + defer func() { h.sctx.Unlock() - return + h.bindInfo.Unlock() + }() + exec, _ := h.sctx.Context.(sqlexec.SQLExecutor) + _, err = exec.ExecuteInternal(context.TODO(), "BEGIN PESSIMISTIC") + if err != nil { + return err } - + var deleteRows int defer func() { if err != nil { _, err1 := exec.ExecuteInternal(context.TODO(), "ROLLBACK") - h.sctx.Unlock() terror.Log(err1) return } _, err = exec.ExecuteInternal(context.TODO(), "COMMIT") - h.sctx.Unlock() - if err != nil { + if err != nil || deleteRows == 0 { return } @@ -354,18 +342,15 @@ func (h *BindHandle) DropBindRecord(originalSQL, db string, binding *Binding) (e if binding != nil { record.Bindings = append(record.Bindings, *binding) } - // Make sure there is only one goroutine writes the cache and uses parser. - h.bindInfo.Lock() h.removeBindRecord(parser.DigestNormalized(originalSQL), record) - h.bindInfo.Unlock() }() - txn, err1 := h.sctx.Context.Txn(true) - if err1 != nil { - return err1 + // Lock mysql.bind_info to synchronize with CreateBindRecord / AddBindRecord / DropBindRecord on other tidb instances. + if err = h.lockBindInfoTable(); err != nil { + return } - updateTs := types.NewTime(types.FromGoTime(oracle.GetTimeFromTS(txn.StartTS())), mysql.TypeTimestamp, 3) + updateTs := types.NewTime(types.FromGoTime(time.Now()), mysql.TypeTimestamp, 3) bindSQL := "" if binding != nil { @@ -373,6 +358,20 @@ func (h *BindHandle) DropBindRecord(originalSQL, db string, binding *Binding) (e } _, err = exec.ExecuteInternal(context.TODO(), h.logicalDeleteBindInfoSQL(originalSQL, db, updateTs, bindSQL)) + deleteRows = int(h.sctx.Context.GetSessionVars().StmtCtx.AffectedRows()) + return err +} + +// lockBindInfoTable simulates `LOCK TABLE mysql.bind_info WRITE` by acquiring a pessimistic lock on a +// special builtin row of mysql.bind_info. Note that this function must be called with h.sctx.Lock() held. +// We can replace this implementation to normal `LOCK TABLE mysql.bind_info WRITE` if that feature is +// generally available later. +// This lock would enforce the CREATE / DROP GLOBAL BINDING statements to be executed sequentially, +// even if they come from different tidb instances. +func (h *BindHandle) lockBindInfoTable() error { + // h.sctx already locked. + exec, _ := h.sctx.Context.(sqlexec.SQLExecutor) + _, err := exec.ExecuteInternal(context.TODO(), h.lockBindInfoSQL()) return err } @@ -483,6 +482,16 @@ func (h *BindHandle) newBindRecord(row chunk.Row) (string, *BindRecord, error) { return hash, bindRecord, err } +// setBindRecord sets the BindRecord to the cache, if there already exists a BindRecord, +// it will be overridden. +func (h *BindHandle) setBindRecord(hash string, meta *BindRecord) { + newCache := h.bindInfo.Value.Load().(cache).copy() + oldRecord := newCache.getBindRecord(hash, meta.OriginalSQL, meta.Db) + newCache.setBindRecord(hash, meta) + h.bindInfo.Value.Store(newCache) + updateMetrics(metrics.ScopeGlobal, oldRecord, meta, false) +} + // appendBindRecord addes the BindRecord to the cache, all the stale BindRecords are // removed from the cache after this operation. func (h *BindHandle) appendBindRecord(hash string, meta *BindRecord) { @@ -565,12 +574,15 @@ func (c cache) getBindRecord(hash, normdOrigSQL, db string) *BindRecord { } func (h *BindHandle) deleteBindInfoSQL(normdOrigSQL, db, bindSQL string) string { - return fmt.Sprintf( - `DELETE FROM mysql.bind_info WHERE original_sql=%s AND default_db=%s AND bind_sql=%s`, + sql := fmt.Sprintf( + `DELETE FROM mysql.bind_info WHERE original_sql=%s AND default_db=%s`, expression.Quote(normdOrigSQL), expression.Quote(db), - expression.Quote(bindSQL), ) + if bindSQL == "" { + return sql + } + return sql + fmt.Sprintf(` and bind_sql = %s`, expression.Quote(bindSQL)) } func (h *BindHandle) insertBindInfoSQL(orignalSQL string, db string, info Binding) string { @@ -587,12 +599,21 @@ func (h *BindHandle) insertBindInfoSQL(orignalSQL string, db string, info Bindin ) } +// lockBindInfoSQL simulates LOCK TABLE by updating a same row in each pessimistic transaction. +func (h *BindHandle) lockBindInfoSQL() string { + return fmt.Sprintf("UPDATE mysql.bind_info SET source=%s WHERE original_sql=%s", + expression.Quote(Builtin), + expression.Quote(BuiltinPseudoSQL4BindLock)) +} + func (h *BindHandle) logicalDeleteBindInfoSQL(originalSQL, db string, updateTs types.Time, bindingSQL string) string { - sql := fmt.Sprintf(`UPDATE mysql.bind_info SET status=%s,update_time=%s WHERE original_sql=%s and default_db=%s`, + updateTsStr := updateTs.String() + sql := fmt.Sprintf(`UPDATE mysql.bind_info SET status=%s,update_time=%s WHERE original_sql=%s and default_db=%s and update_time<%s`, expression.Quote(deleted), - expression.Quote(updateTs.String()), + expression.Quote(updateTsStr), expression.Quote(originalSQL), - expression.Quote(db)) + expression.Quote(db), + expression.Quote(updateTsStr)) if bindingSQL == "" { return sql } diff --git a/session/bootstrap.go b/session/bootstrap.go index 3722aaf668994..9c913aef2da8c 100644 --- a/session/bootstrap.go +++ b/session/bootstrap.go @@ -31,6 +31,7 @@ import ( "github.com/pingcap/parser/auth" "github.com/pingcap/parser/mysql" "github.com/pingcap/parser/terror" + "github.com/pingcap/tidb/bindinfo" "github.com/pingcap/tidb/config" "github.com/pingcap/tidb/ddl" "github.com/pingcap/tidb/domain" @@ -234,15 +235,15 @@ const ( // CreateBindInfoTable stores the sql bind info which is used to update globalBindCache. CreateBindInfoTable = `CREATE TABLE IF NOT EXISTS mysql.bind_info ( - original_sql TEXT NOT NULL , - bind_sql TEXT NOT NULL , - default_db TEXT NOT NULL, - status TEXT NOT NULL, - create_time TIMESTAMP(3) NOT NULL, - update_time TIMESTAMP(3) NOT NULL, - charset TEXT NOT NULL, - collation TEXT NOT NULL, - source VARCHAR(10) NOT NULL DEFAULT 'unknown', + original_sql TEXT NOT NULL, + bind_sql TEXT NOT NULL, + default_db TEXT NOT NULL, + status TEXT NOT NULL, + create_time TIMESTAMP(3) NOT NULL, + update_time TIMESTAMP(3) NOT NULL, + charset TEXT NOT NULL, + collation TEXT NOT NULL, + source VARCHAR(10) NOT NULL DEFAULT 'unknown', INDEX sql_index(original_sql(1024),default_db(1024)) COMMENT "accelerate the speed when add global binding query", INDEX time_index(update_time) COMMENT "accelerate the speed when querying with last update time" ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin;` @@ -437,6 +438,8 @@ const ( version55 = 55 // version56 fixes the bug that upgradeToVer49 would be missed when upgrading from v4.0 to a new version version56 = 56 + // version57 fixes the bug of concurrent create / drop binding + version57 = 57 ) var ( @@ -497,6 +500,7 @@ var ( upgradeToVer54, upgradeToVer55, upgradeToVer56, + upgradeToVer57, } ) @@ -1228,6 +1232,24 @@ func upgradeToVer56(s Session, ver int64) { doReentrantDDL(s, CreateStatsExtended) } +func upgradeToVer57(s Session, ver int64) { + if ver >= version57 { + return + } + insertBuiltinBindInfoRow(s) +} + +func initBindInfoTable(s Session) { + mustExecute(s, CreateBindInfoTable) + insertBuiltinBindInfoRow(s) +} + +func insertBuiltinBindInfoRow(s Session) { + sql := fmt.Sprintf(`INSERT HIGH_PRIORITY INTO mysql.bind_info VALUES ("%s", "%s", "mysql", "%s", "0000-00-00 00:00:00", "0000-00-00 00:00:00", "", "", "%s")`, + bindinfo.BuiltinPseudoSQL4BindLock, bindinfo.BuiltinPseudoSQL4BindLock, bindinfo.Builtin, bindinfo.Builtin) + mustExecute(s, sql) +} + func writeMemoryQuotaQuery(s Session) { comment := "memory_quota_query is 32GB by default in v3.0.x, 1GB by default in v4.0.x" sql := fmt.Sprintf(`INSERT HIGH_PRIORITY INTO %s.%s VALUES ("%s", '%d', '%s') ON DUPLICATE KEY UPDATE VARIABLE_VALUE='%d'`, @@ -1291,7 +1313,7 @@ func doDDLWorks(s Session) { // Create default_roles table. mustExecute(s, CreateDefaultRolesTable) // Create bind_info table. - mustExecute(s, CreateBindInfoTable) + initBindInfoTable(s) // Create stats_topn_store table. mustExecute(s, CreateStatsTopNTable) // Create expr_pushdown_blacklist table. diff --git a/session/session.go b/session/session.go index 34072792a2d1b..b67669a3d5073 100644 --- a/session/session.go +++ b/session/session.go @@ -2133,7 +2133,7 @@ func CreateSessionWithDomain(store kv.Storage, dom *domain.Domain) (*session, er const ( notBootstrapped = 0 - currentBootstrapVersion = version56 + currentBootstrapVersion = version57 ) func getStoreBootstrapVersion(store kv.Storage) int64 {