Skip to content

Commit

Permalink
cherry pick pingcap#11981 to release-3.0
Browse files Browse the repository at this point in the history
Signed-off-by: sre-bot <sre-bot@pingcap.com>
  • Loading branch information
crazycs520 authored and sre-bot committed Apr 13, 2020
1 parent 31f87b8 commit dcda31e
Show file tree
Hide file tree
Showing 5 changed files with 216 additions and 3 deletions.
2 changes: 1 addition & 1 deletion executor/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -859,7 +859,7 @@ func (a *ExecStmt) SummaryStmt() {
// IsPointGetWithPKOrUniqueKeyByAutoCommit returns true when meets following conditions:
// 1. ctx is auto commit tagged
// 2. txn is not valid
// 2. plan is point get by pk, or point get by unique index (no double read)
// 3. plan is point get by pk, or point get by unique index (no double read)
func IsPointGetWithPKOrUniqueKeyByAutoCommit(ctx sessionctx.Context, p plannercore.Plan) (bool, error) {
// check auto commit
if !ctx.GetSessionVars().IsAutocommit() {
Expand Down
142 changes: 142 additions & 0 deletions planner/optimize.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,37 @@ import (
func Optimize(ctx context.Context, sctx sessionctx.Context, node ast.Node, is infoschema.InfoSchema) (plannercore.Plan, error) {
fp := plannercore.TryFastPlan(sctx, node)
if fp != nil {
if !isPointGetWithoutDoubleRead(sctx, fp) {
sctx.PrepareTxnFuture(ctx)
}
return fp, nil
}

<<<<<<< HEAD
=======
sctx.PrepareTxnFuture(ctx)

var oriHint *bindinfo.HintsSet
if stmtNode, ok := node.(ast.StmtNode); ok {
oriHint = addHint(sctx, stmtNode)
}
plan, err := optimize(ctx, sctx, node, is)
// Restore the original hint in case of prepare stmt.
if oriHint != nil {
node = bindinfo.BindHint(node.(ast.StmtNode), oriHint)
if err != nil {
handleInvalidBindRecord(ctx, sctx, node.(ast.StmtNode))
}
}
if err == nil || oriHint == nil {
return plan, err
}
// Reoptimize after restore the original hint.
return optimize(ctx, sctx, node, is)
}

func optimize(ctx context.Context, sctx sessionctx.Context, node ast.Node, is infoschema.InfoSchema) (plannercore.Plan, error) {
>>>>>>> dffe293... *: not send tso request when point get with max tso (#11981)
// build logical plan
sctx.GetSessionVars().PlanID = 0
sctx.GetSessionVars().PlanColumnID = 0
Expand Down Expand Up @@ -71,6 +99,120 @@ func Optimize(ctx context.Context, sctx sessionctx.Context, node ast.Node, is in
return plannercore.DoOptimize(ctx, builder.GetOptFlag(), logic)
}

<<<<<<< HEAD
=======
func extractSelectAndNormalizeDigest(stmtNode ast.StmtNode) (*ast.SelectStmt, string, string) {
switch x := stmtNode.(type) {
case *ast.ExplainStmt:
switch x.Stmt.(type) {
case *ast.SelectStmt:
normalizeExplainSQL := parser.Normalize(x.Text())
idx := strings.Index(normalizeExplainSQL, "select")
normalizeSQL := normalizeExplainSQL[idx:]
hash := parser.DigestHash(normalizeSQL)
return x.Stmt.(*ast.SelectStmt), normalizeSQL, hash
}
case *ast.SelectStmt:
normalizedSQL, hash := parser.NormalizeDigest(x.Text())
return x, normalizedSQL, hash
}
return nil, "", ""
}

func addHint(ctx sessionctx.Context, stmtNode ast.StmtNode) *bindinfo.HintsSet {
// When the domain is initializing, the bind will be nil.
if ctx.Value(bindinfo.SessionBindInfoKeyType) == nil {
return nil
}
selectStmt, normalizedSQL, hash := extractSelectAndNormalizeDigest(stmtNode)
if selectStmt == nil {
return nil
}
return addHintForSelect(ctx, selectStmt, normalizedSQL, hash)
}

func addHintForSelect(ctx sessionctx.Context, stmt ast.StmtNode, normdOrigSQL, hash string) *bindinfo.HintsSet {
sessionHandle := ctx.Value(bindinfo.SessionBindInfoKeyType).(*bindinfo.SessionHandle)
bindRecord := sessionHandle.GetBindRecord(normdOrigSQL, ctx.GetSessionVars().CurrentDB)
if bindRecord != nil {
if bindRecord.Status == bindinfo.Invalid {
return nil
}
if bindRecord.Status == bindinfo.Using {
metrics.BindUsageCounter.WithLabelValues(metrics.ScopeSession).Inc()
oriHint := bindinfo.CollectHint(stmt)
bindinfo.BindHint(stmt, bindRecord.HintsSet)
return oriHint
}
}
globalHandle := domain.GetDomain(ctx).BindHandle()
bindRecord = globalHandle.GetBindRecord(hash, normdOrigSQL, ctx.GetSessionVars().CurrentDB)
if bindRecord == nil {
bindRecord = globalHandle.GetBindRecord(hash, normdOrigSQL, "")
}
if bindRecord != nil {
metrics.BindUsageCounter.WithLabelValues(metrics.ScopeGlobal).Inc()
oriHint := bindinfo.CollectHint(stmt)
bindinfo.BindHint(stmt, bindRecord.HintsSet)
return oriHint
}
return nil
}

func handleInvalidBindRecord(ctx context.Context, sctx sessionctx.Context, stmtNode ast.StmtNode) {
selectStmt, normdOrigSQL, hash := extractSelectAndNormalizeDigest(stmtNode)
if selectStmt == nil {
return
}
sessionHandle := sctx.Value(bindinfo.SessionBindInfoKeyType).(*bindinfo.SessionHandle)
bindMeta := sessionHandle.GetBindRecord(normdOrigSQL, sctx.GetSessionVars().CurrentDB)
if bindMeta != nil {
bindMeta.Status = bindinfo.Invalid
return
}

globalHandle := domain.GetDomain(sctx).BindHandle()
bindMeta = globalHandle.GetBindRecord(hash, normdOrigSQL, sctx.GetSessionVars().CurrentDB)
if bindMeta == nil {
bindMeta = globalHandle.GetBindRecord(hash, normdOrigSQL, "")
}
if bindMeta != nil {
record := &bindinfo.BindRecord{
OriginalSQL: bindMeta.OriginalSQL,
BindSQL: bindMeta.BindSQL,
Db: sctx.GetSessionVars().CurrentDB,
Charset: bindMeta.Charset,
Collation: bindMeta.Collation,
Status: bindinfo.Invalid,
}

err := sessionHandle.AddBindRecord(record)
if err != nil {
logutil.Logger(ctx).Warn("handleInvalidBindRecord failed", zap.Error(err))
}

globalHandle := domain.GetDomain(sctx).BindHandle()
dropBindRecord := &bindinfo.BindRecord{
OriginalSQL: bindMeta.OriginalSQL,
Db: bindMeta.Db,
}
globalHandle.AddDropInvalidBindTask(dropBindRecord)
}
}

// isPointGetWithoutDoubleRead returns true when meets following conditions:
// 1. ctx is auto commit tagged.
// 2. plan is point get by pk.
func isPointGetWithoutDoubleRead(ctx sessionctx.Context, p plannercore.Plan) bool {
if !ctx.GetSessionVars().IsAutocommit() {
return false
}

v, ok := p.(*plannercore.PointGetPlan)
return ok && v.IndexInfo == nil
}

>>>>>>> dffe293... *: not send tso request when point get with max tso (#11981)
func init() {
plannercore.OptimizeAstNode = Optimize
}
13 changes: 11 additions & 2 deletions session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -1182,6 +1182,7 @@ func (s *session) PrepareStmt(sql string) (stmtID uint32, paramCount int, fields
// NewPrepareExec may need startTS to build the executor, for example prepare statement has subquery in int.
// So we have to call PrepareTxnCtx here.
s.PrepareTxnCtx(ctx)
s.PrepareTxnFuture(ctx)
prepareExec := executor.NewPrepareExec(s, executor.GetInfoSchema(s), sql)
err = prepareExec.Next(ctx, nil)
if err != nil {
Expand Down Expand Up @@ -1805,8 +1806,6 @@ func (s *session) PrepareTxnCtx(ctx context.Context) {
return
}

txnFuture := s.getTxnFuture(ctx)
s.txn.changeInvalidToPending(txnFuture)
is := domain.GetDomain(s).InfoSchema()
s.sessionVars.TxnCtx = &variable.TransactionContext{
InfoSchema: is,
Expand All @@ -1823,6 +1822,16 @@ func (s *session) PrepareTxnCtx(ctx context.Context) {
}
}

// PrepareTxnFuture uses to try to get txn future.
func (s *session) PrepareTxnFuture(ctx context.Context) {
if s.txn.validOrPending() {
return
}

txnFuture := s.getTxnFuture(ctx)
s.txn.changeInvalidToPending(txnFuture)
}

// RefreshTxnCtx implements context.RefreshTxnCtx interface.
func (s *session) RefreshTxnCtx(ctx context.Context) error {
if err := s.doCommit(ctx); err != nil {
Expand Down
19 changes: 19 additions & 0 deletions sessionctx/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,25 @@ type Context interface {
StmtAddDirtyTableOP(op int, physicalID int64, handle int64)
// DDLOwnerChecker returns owner.DDLOwnerChecker.
DDLOwnerChecker() owner.DDLOwnerChecker
<<<<<<< HEAD
=======
// AddTableLock adds table lock to the session lock map.
AddTableLock([]model.TableLockTpInfo)
// ReleaseTableLocks releases table locks in the session lock map.
ReleaseTableLocks(locks []model.TableLockTpInfo)
// ReleaseTableLockByTableID releases table locks in the session lock map by table ID.
ReleaseTableLockByTableIDs(tableIDs []int64)
// CheckTableLocked checks the table lock.
CheckTableLocked(tblID int64) (bool, model.TableLockType)
// GetAllTableLocks gets all table locks table id and db id hold by the session.
GetAllTableLocks() []model.TableLockTpInfo
// ReleaseAllTableLocks releases all table locks hold by the session.
ReleaseAllTableLocks()
// HasLockedTables uses to check whether this session locked any tables.
HasLockedTables() bool
// PrepareTxnFuture uses to prepare txn by future.
PrepareTxnFuture(ctx context.Context)
>>>>>>> dffe293... *: not send tso request when point get with max tso (#11981)
}

type basicCtxType int
Expand Down
43 changes: 43 additions & 0 deletions util/mock/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,50 @@ func (c *Context) StmtGetMutation(tableID int64) *binlog.TableMutation {
}

// StmtAddDirtyTableOP implements the sessionctx.Context interface.
<<<<<<< HEAD
func (c *Context) StmtAddDirtyTableOP(op int, tid int64, handle int64) {
=======
func (c *Context) StmtAddDirtyTableOP(op int, tid int64, handle int64, row []types.Datum) {
}

// AddTableLock implements the sessionctx.Context interface.
func (c *Context) AddTableLock(_ []model.TableLockTpInfo) {
}

// ReleaseTableLocks implements the sessionctx.Context interface.
func (c *Context) ReleaseTableLocks(locks []model.TableLockTpInfo) {
}

// ReleaseTableLockByTableIDs implements the sessionctx.Context interface.
func (c *Context) ReleaseTableLockByTableIDs(tableIDs []int64) {
}

// CheckTableLocked implements the sessionctx.Context interface.
func (c *Context) CheckTableLocked(_ int64) (bool, model.TableLockType) {
return false, model.TableLockNone
}

// GetAllTableLocks implements the sessionctx.Context interface.
func (c *Context) GetAllTableLocks() []model.TableLockTpInfo {
return nil
}

// ReleaseAllTableLocks implements the sessionctx.Context interface.
func (c *Context) ReleaseAllTableLocks() {
}

// HasLockedTables implements the sessionctx.Context interface.
func (c *Context) HasLockedTables() bool {
return false
}

// PrepareTxnFuture implements the sessionctx.Context interface.
func (c *Context) PrepareTxnFuture(ctx context.Context) {
}

// Close implements the sessionctx.Context interface.
func (c *Context) Close() {
>>>>>>> dffe293... *: not send tso request when point get with max tso (#11981)
}

// NewContext creates a new mocked sessionctx.Context.
Expand Down

0 comments on commit dcda31e

Please sign in to comment.