Skip to content

Commit

Permalink
bindinfo: sync concurrent ops on mysql.bind_info from multiple tidb i…
Browse files Browse the repository at this point in the history
…nstances
  • Loading branch information
eurekaka committed Dec 9, 2020
1 parent dfbb69d commit 28a2a07
Show file tree
Hide file tree
Showing 5 changed files with 128 additions and 83 deletions.
4 changes: 2 additions & 2 deletions bindinfo/bind_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ func (s *testSuite) TearDownTest(c *C) {
}

func (s *testSuite) cleanBindingEnv(tk *testkit.TestKit) {
tk.MustExec("truncate table mysql.bind_info")
tk.MustExec("delete from mysql.bind_info where source != 'builtin'")
s.domain.BindHandle().Clear()
}

Expand Down Expand Up @@ -1304,7 +1304,7 @@ func (s *testSuite) TestEvolveInvalidBindings(c *C) {
// Manufacture a rejected binding by hacking mysql.bind_info.
tk.MustExec("insert into mysql.bind_info values('select * from t where a > ?', 'select /*+ USE_INDEX(t,idx_a) */ * from t where a > 10', 'test', 'rejected', '2000-01-01 09:00:00', '2000-01-01 09:00:00', '', '','" +
bindinfo.Manual + "')")
tk.MustQuery("select bind_sql, status from mysql.bind_info").Sort().Check(testkit.Rows(
tk.MustQuery("select bind_sql, status from mysql.bind_info where source != 'builtin'").Sort().Check(testkit.Rows(
"select /*+ USE_INDEX(t) */ * from t where a > 10 using",
"select /*+ USE_INDEX(t,idx_a) */ * from t where a > 10 rejected",
))
Expand Down
2 changes: 2 additions & 0 deletions bindinfo/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ const (
Capture = "capture"
// Evolve indicates the binding is evolved by TiDB from old bindings.
Evolve = "evolve"
// Builtin indicates the binding is a builtin record for internal locking purpose. It is also the status for the builtin binding.
Builtin = "builtin"
)

// Binding stores the basic bind hint info.
Expand Down
161 changes: 91 additions & 70 deletions bindinfo/handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ import (
"github.com/pingcap/tidb/metrics"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/store/tikv/oracle"
"github.com/pingcap/tidb/types"
driver "github.com/pingcap/tidb/types/parser_driver"
"github.com/pingcap/tidb/util/chunk"
Expand Down Expand Up @@ -94,6 +93,8 @@ const (
OwnerKey = "/tidb/bindinfo/owner"
// Prompt is the prompt for bindinfo owner manager.
Prompt = "bindinfo"
// BuiltinPseudoSQL4BindLock is used to simulate LOCK TABLE for mysql.bind_info.
BuiltinPseudoSQL4BindLock = "builtin_pseudo_sql_for_bind_lock"
)

type bindRecordUpdate struct {
Expand Down Expand Up @@ -123,7 +124,6 @@ func NewBindHandle(ctx sessionctx.Context) *BindHandle {
func (h *BindHandle) Update(fullLoad bool) (err error) {
h.bindInfo.Lock()
lastUpdateTime := h.bindInfo.lastUpdateTime
h.bindInfo.Unlock()

sql := "select original_sql, bind_sql, default_db, status, create_time, update_time, charset, collation, source from mysql.bind_info"
if !fullLoad {
Expand All @@ -136,11 +136,10 @@ func (h *BindHandle) Update(fullLoad bool) (err error) {
// uses another background session.
rows, _, err := h.sctx.Context.(sqlexec.RestrictedSQLExecutor).ExecRestrictedSQL(sql)
if err != nil {
h.bindInfo.Unlock()
return err
}

// Make sure there is only one goroutine writes the cache.
h.bindInfo.Lock()
newCache := h.bindInfo.Value.Load().(cache).copy()
defer func() {
h.bindInfo.lastUpdateTime = lastUpdateTime
Expand All @@ -149,6 +148,10 @@ func (h *BindHandle) Update(fullLoad bool) (err error) {
}()

for _, row := range rows {
// Skip the builtin record which is designed for binding synchronization.
if row.GetString(0) == BuiltinPseudoSQL4BindLock {
continue
}
hash, meta, err := h.newBindRecord(row)
// Update lastUpdateTime to the newest one.
if meta.Bindings[0].UpdateTime.Compare(lastUpdateTime) > 0 {
Expand Down Expand Up @@ -179,65 +182,51 @@ func (h *BindHandle) CreateBindRecord(sctx sessionctx.Context, record *BindRecor
return err
}

exec, _ := h.sctx.Context.(sqlexec.SQLExecutor)
h.bindInfo.Lock()
h.sctx.Lock()
_, err = exec.ExecuteInternal(context.TODO(), "BEGIN")
if err != nil {
defer func() {
h.sctx.Unlock()
h.bindInfo.Unlock()
}()
exec, _ := h.sctx.Context.(sqlexec.SQLExecutor)
_, err = exec.ExecuteInternal(context.TODO(), "BEGIN PESSIMISTIC")
if err != nil {
return
}

normalizedSQL := parser.DigestNormalized(record.OriginalSQL)
oldRecord := h.GetBindRecord(normalizedSQL, record.OriginalSQL, record.Db)

defer func() {
if err != nil {
_, err1 := exec.ExecuteInternal(context.TODO(), "ROLLBACK")
h.sctx.Unlock()
terror.Log(err1)
return
}

_, err = exec.ExecuteInternal(context.TODO(), "COMMIT")
h.sctx.Unlock()
if err != nil {
return
}

// Make sure there is only one goroutine writes the cache and uses parser.
h.bindInfo.Lock()
if oldRecord != nil {
h.removeBindRecord(normalizedSQL, oldRecord)
}
h.appendBindRecord(normalizedSQL, record)
h.bindInfo.Unlock()
sqlDigest := parser.DigestNormalized(record.OriginalSQL)
h.setBindRecord(sqlDigest, record)
}()

var txn kv.Transaction
txn, err = h.sctx.Context.Txn(true)
// Lock mysql.bind_info to synchronize with CreateBindRecord / AddBindRecord / DropBindRecord on other tidb instances.
if err = h.lockBindInfoTable(); err != nil {
return err
}
// Binding recreation should physically delete previous bindings.
_, err = exec.ExecuteInternal(context.TODO(), h.deleteBindInfoSQL(record.OriginalSQL, record.Db, ""))
if err != nil {
return err
}
now := types.NewTime(types.FromGoTime(oracle.GetTimeFromTS(txn.StartTS())), mysql.TypeTimestamp, 3)

if oldRecord != nil {
for _, binding := range oldRecord.Bindings {
// Binding recreation should physically delete previous bindings, since marking them as deleted may
// cause unexpected binding caches if there are concurrent CREATE BINDING on multiple tidb instances,
// because the record with `using` status is not guaranteed to have larger update_time than those records
// with `deleted` status.
_, err = exec.ExecuteInternal(context.TODO(), h.deleteBindInfoSQL(record.OriginalSQL, record.Db, binding.BindSQL))
if err != nil {
return err
}
}
}
now := types.NewTime(types.FromGoTime(time.Now()), mysql.TypeTimestamp, 3)

for i := range record.Bindings {
record.Bindings[i].CreateTime = now
record.Bindings[i].UpdateTime = now

// insert the BindRecord to the storage.
// Insert the BindRecord to the storage.
_, err = exec.ExecuteInternal(context.TODO(), h.insertBindInfoSQL(record.OriginalSQL, record.Db, record.Bindings[i]))
if err != nil {
return err
Expand Down Expand Up @@ -267,48 +256,45 @@ func (h *BindHandle) AddBindRecord(sctx sessionctx.Context, record *BindRecord)
}
}

exec, _ := h.sctx.Context.(sqlexec.SQLExecutor)
h.bindInfo.Lock()
h.sctx.Lock()
_, err = exec.ExecuteInternal(context.TODO(), "BEGIN")
if err != nil {
defer func() {
h.sctx.Unlock()
h.bindInfo.Unlock()
}()
exec, _ := h.sctx.Context.(sqlexec.SQLExecutor)
_, err = exec.ExecuteInternal(context.TODO(), "BEGIN PESSIMISTIC")
if err != nil {
return
}

defer func() {
if err != nil {
_, err1 := exec.ExecuteInternal(context.TODO(), "ROLLBACK")
h.sctx.Unlock()
terror.Log(err1)
return
}

_, err = exec.ExecuteInternal(context.TODO(), "COMMIT")
h.sctx.Unlock()
if err != nil {
return
}

// Make sure there is only one goroutine writes the cache and uses parser.
h.bindInfo.Lock()
h.appendBindRecord(parser.DigestNormalized(record.OriginalSQL), record)
h.bindInfo.Unlock()
}()

var txn kv.Transaction
txn, err = h.sctx.Context.Txn(true)
if err != nil {
// Lock mysql.bind_info to synchronize with CreateBindRecord / AddBindRecord / DropBindRecord on other tidb instances.
if err = h.lockBindInfoTable(); err != nil {
return err
}

if duplicateBinding != nil {
_, err = exec.ExecuteInternal(context.TODO(), h.deleteBindInfoSQL(record.OriginalSQL, record.Db, duplicateBinding.BindSQL))
if err != nil {
return err
}
}

now := types.NewTime(types.FromGoTime(oracle.GetTimeFromTS(txn.StartTS())), mysql.TypeTimestamp, 3)
now := types.NewTime(types.FromGoTime(time.Now()), mysql.TypeTimestamp, 3)
for i := range record.Bindings {
if duplicateBinding != nil {
record.Bindings[i].CreateTime = duplicateBinding.CreateTime
Expand All @@ -317,7 +303,7 @@ func (h *BindHandle) AddBindRecord(sctx sessionctx.Context, record *BindRecord)
}
record.Bindings[i].UpdateTime = now

// insert the BindRecord to the storage.
// Insert the BindRecord to the storage.
_, err = exec.ExecuteInternal(context.TODO(), h.insertBindInfoSQL(record.OriginalSQL, record.Db, record.Bindings[i]))
if err != nil {
return err
Expand All @@ -328,51 +314,64 @@ func (h *BindHandle) AddBindRecord(sctx sessionctx.Context, record *BindRecord)

// DropBindRecord drops a BindRecord to the storage and BindRecord int the cache.
func (h *BindHandle) DropBindRecord(originalSQL, db string, binding *Binding) (err error) {
exec, _ := h.sctx.Context.(sqlexec.SQLExecutor)
h.bindInfo.Lock()
h.sctx.Lock()
_, err = exec.ExecuteInternal(context.TODO(), "BEGIN")
if err != nil {
defer func() {
h.sctx.Unlock()
return
h.bindInfo.Unlock()
}()
exec, _ := h.sctx.Context.(sqlexec.SQLExecutor)
_, err = exec.ExecuteInternal(context.TODO(), "BEGIN PESSIMISTIC")
if err != nil {
return err
}

var deleteRows int
defer func() {
if err != nil {
_, err1 := exec.ExecuteInternal(context.TODO(), "ROLLBACK")
h.sctx.Unlock()
terror.Log(err1)
return
}

_, err = exec.ExecuteInternal(context.TODO(), "COMMIT")
h.sctx.Unlock()
if err != nil {
if err != nil || deleteRows == 0 {
return
}

record := &BindRecord{OriginalSQL: originalSQL, Db: db}
if binding != nil {
record.Bindings = append(record.Bindings, *binding)
}
// Make sure there is only one goroutine writes the cache and uses parser.
h.bindInfo.Lock()
h.removeBindRecord(parser.DigestNormalized(originalSQL), record)
h.bindInfo.Unlock()
}()

txn, err1 := h.sctx.Context.Txn(true)
if err1 != nil {
return err1
// Lock mysql.bind_info to synchronize with CreateBindRecord / AddBindRecord / DropBindRecord on other tidb instances.
if err = h.lockBindInfoTable(); err != nil {
return
}

updateTs := types.NewTime(types.FromGoTime(oracle.GetTimeFromTS(txn.StartTS())), mysql.TypeTimestamp, 3)
updateTs := types.NewTime(types.FromGoTime(time.Now()), mysql.TypeTimestamp, 3)

bindSQL := ""
if binding != nil {
bindSQL = binding.BindSQL
}

_, err = exec.ExecuteInternal(context.TODO(), h.logicalDeleteBindInfoSQL(originalSQL, db, updateTs, bindSQL))
deleteRows = int(h.sctx.Context.GetSessionVars().StmtCtx.AffectedRows())
return err
}

// lockBindInfoTable simulates `LOCK TABLE mysql.bind_info WRITE` by acquiring a pessimistic lock on a
// special builtin row of mysql.bind_info. Note that this function must be called with h.sctx.Lock() held.
// We can replace this implementation to normal `LOCK TABLE mysql.bind_info WRITE` if that feature is
// generally available later.
// This lock would enforce the CREATE / DROP GLOBAL BINDING statements to be executed sequentially,
// even if they come from different tidb instances.
func (h *BindHandle) lockBindInfoTable() error {
// h.sctx already locked.
exec, _ := h.sctx.Context.(sqlexec.SQLExecutor)
_, err := exec.ExecuteInternal(context.TODO(), h.lockBindInfoSQL())
return err
}

Expand Down Expand Up @@ -483,6 +482,16 @@ func (h *BindHandle) newBindRecord(row chunk.Row) (string, *BindRecord, error) {
return hash, bindRecord, err
}

// setBindRecord sets the BindRecord to the cache, if there already exists a BindRecord,
// it will be overridden.
func (h *BindHandle) setBindRecord(hash string, meta *BindRecord) {
newCache := h.bindInfo.Value.Load().(cache).copy()
oldRecord := newCache.getBindRecord(hash, meta.OriginalSQL, meta.Db)
newCache.setBindRecord(hash, meta)
h.bindInfo.Value.Store(newCache)
updateMetrics(metrics.ScopeGlobal, oldRecord, meta, false)
}

// appendBindRecord addes the BindRecord to the cache, all the stale BindRecords are
// removed from the cache after this operation.
func (h *BindHandle) appendBindRecord(hash string, meta *BindRecord) {
Expand Down Expand Up @@ -565,12 +574,15 @@ func (c cache) getBindRecord(hash, normdOrigSQL, db string) *BindRecord {
}

func (h *BindHandle) deleteBindInfoSQL(normdOrigSQL, db, bindSQL string) string {
return fmt.Sprintf(
`DELETE FROM mysql.bind_info WHERE original_sql=%s AND default_db=%s AND bind_sql=%s`,
sql := fmt.Sprintf(
`DELETE FROM mysql.bind_info WHERE original_sql=%s AND default_db=%s`,
expression.Quote(normdOrigSQL),
expression.Quote(db),
expression.Quote(bindSQL),
)
if bindSQL == "" {
return sql
}
return sql + fmt.Sprintf(` and bind_sql = %s`, expression.Quote(bindSQL))
}

func (h *BindHandle) insertBindInfoSQL(orignalSQL string, db string, info Binding) string {
Expand All @@ -587,12 +599,21 @@ func (h *BindHandle) insertBindInfoSQL(orignalSQL string, db string, info Bindin
)
}

// lockBindInfoSQL simulates LOCK TABLE by updating a same row in each pessimistic transaction.
func (h *BindHandle) lockBindInfoSQL() string {
return fmt.Sprintf("UPDATE mysql.bind_info SET source=%s WHERE original_sql=%s",
expression.Quote(Builtin),
expression.Quote(BuiltinPseudoSQL4BindLock))
}

func (h *BindHandle) logicalDeleteBindInfoSQL(originalSQL, db string, updateTs types.Time, bindingSQL string) string {
sql := fmt.Sprintf(`UPDATE mysql.bind_info SET status=%s,update_time=%s WHERE original_sql=%s and default_db=%s`,
updateTsStr := updateTs.String()
sql := fmt.Sprintf(`UPDATE mysql.bind_info SET status=%s,update_time=%s WHERE original_sql=%s and default_db=%s and update_time<%s`,
expression.Quote(deleted),
expression.Quote(updateTs.String()),
expression.Quote(updateTsStr),
expression.Quote(originalSQL),
expression.Quote(db))
expression.Quote(db),
expression.Quote(updateTsStr))
if bindingSQL == "" {
return sql
}
Expand Down
Loading

0 comments on commit 28a2a07

Please sign in to comment.