Skip to content

Commit

Permalink
*: add TxnManager to manage txn in session (#30574)
Browse files Browse the repository at this point in the history
* *: add TxnManager to manage txn in session

* modify

* add tests

* move failpoint content to a single file
  • Loading branch information
lcwangchao authored Dec 22, 2021
1 parent 3bd732f commit 529ce88
Show file tree
Hide file tree
Showing 17 changed files with 1,090 additions and 40 deletions.
4 changes: 3 additions & 1 deletion ddl/db_change_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -411,7 +411,9 @@ func (t *testExecInfo) compileSQL(idx int) (err error) {
compiler := executor.Compiler{Ctx: c.session}
se := c.session
ctx := context.TODO()
se.PrepareTxnCtx(ctx)
if err = se.PrepareTxnCtx(ctx); err != nil {
return err
}
sctx := se.(sessionctx.Context)
if err = executor.ResetContextOfStmt(sctx, c.rawStmt); err != nil {
return errors.Trace(err)
Expand Down
26 changes: 26 additions & 0 deletions executor/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ import (
"github.com/pingcap/tidb/plugin"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/sessiontxn"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/execdetails"
Expand Down Expand Up @@ -263,6 +264,12 @@ func (a *ExecStmt) PointGet(ctx context.Context, is infoschema.InfoSchema) (*rec
a.PsStmt.Executor = newExecutor
}
pointExecutor := a.PsStmt.Executor.(*PointGetExecutor)

failpoint.Inject("assertTxnManagerInShortPointGetPlan", func() {
sessiontxn.RecordAssert(a.Ctx, "assertTxnManagerInShortPointGetPlan", true)
sessiontxn.AssertTxnManagerInfoSchema(a.Ctx, is)
})

if err = pointExecutor.Open(ctx); err != nil {
terror.Call(pointExecutor.Close)
return nil, err
Expand Down Expand Up @@ -298,6 +305,16 @@ func (a *ExecStmt) RebuildPlan(ctx context.Context) (int64, error) {
if err := plannercore.Preprocess(a.Ctx, a.StmtNode, plannercore.InTxnRetry, plannercore.WithPreprocessorReturn(ret)); err != nil {
return 0, err
}

failpoint.Inject("assertTxnManagerInRebuildPlan", func() {
if is, ok := a.Ctx.Value(sessiontxn.AssertTxnInfoSchemaAfterRetryKey).(infoschema.InfoSchema); ok {
a.Ctx.SetValue(sessiontxn.AssertTxnInfoSchemaKey, is)
a.Ctx.SetValue(sessiontxn.AssertTxnInfoSchemaAfterRetryKey, nil)
}
sessiontxn.RecordAssert(a.Ctx, "assertTxnManagerInRebuildPlan", true)
sessiontxn.AssertTxnManagerInfoSchema(a.Ctx, ret.InfoSchema)
})

a.InfoSchema = ret.InfoSchema
a.SnapshotTS = ret.LastSnapshotTS
a.IsStaleness = ret.IsStaleness
Expand Down Expand Up @@ -755,6 +772,10 @@ func (a *ExecStmt) handlePessimisticLockError(ctx context.Context, err error) (E
a.Ctx.GetSessionVars().StmtCtx.ResetForRetry()
a.Ctx.GetSessionVars().RetryInfo.ResetOffset()

failpoint.Inject("assertTxnManagerAfterPessimisticLockErrorRetry", func() {
sessiontxn.RecordAssert(a.Ctx, "assertTxnManagerAfterPessimisticLockErrorRetry", true)
})

if err = e.Open(ctx); err != nil {
return nil, err
}
Expand Down Expand Up @@ -809,6 +830,11 @@ func (a *ExecStmt) buildExecutor() (Executor, error) {
return nil, errors.Trace(b.err)
}

failpoint.Inject("assertTxnManagerAfterBuildExecutor", func() {
sessiontxn.RecordAssert(a.Ctx, "assertTxnManagerAfterBuildExecutor", true)
sessiontxn.AssertTxnManagerInfoSchema(b.ctx, b.is)
})

// ExecuteExec is not a real Executor, we only use it to build another Executor from a prepared statement.
if executorExec, ok := e.(*ExecuteExec); ok {
err := executorExec.Build(b)
Expand Down
13 changes: 12 additions & 1 deletion executor/compiler.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/pingcap/tidb/planner"
plannercore "github.com/pingcap/tidb/planner/core"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessiontxn"
)

var (
Expand Down Expand Up @@ -57,11 +58,21 @@ func (c *Compiler) Compile(ctx context.Context, stmtNode ast.StmtNode) (*ExecStm

ret := &plannercore.PreprocessorReturn{}
pe := &plannercore.PreprocessExecuteISUpdate{ExecuteInfoSchemaUpdate: planner.GetExecuteForUpdateReadIS, Node: stmtNode}
err := plannercore.Preprocess(c.Ctx, stmtNode, plannercore.WithPreprocessorReturn(ret), plannercore.WithExecuteInfoSchemaUpdate(pe))
err := plannercore.Preprocess(c.Ctx,
stmtNode,
plannercore.WithPreprocessorReturn(ret),
plannercore.WithExecuteInfoSchemaUpdate(pe),
plannercore.InitTxnContextProvider,
)
if err != nil {
return nil, err
}

failpoint.Inject("assertTxnManagerInCompile", func() {
sessiontxn.RecordAssert(c.Ctx, "assertTxnManagerInCompile", true)
sessiontxn.AssertTxnManagerInfoSchema(c.Ctx, ret.InfoSchema)
})

finalPlan, names, err := planner.Optimize(ctx, c.Ctx, stmtNode, ret.InfoSchema)
if err != nil {
return nil, err
Expand Down
3 changes: 1 addition & 2 deletions executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ import (
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tidb/parser/terror"
"github.com/pingcap/tidb/planner"
plannercore "github.com/pingcap/tidb/planner/core"
"github.com/pingcap/tidb/privilege"
"github.com/pingcap/tidb/sessionctx"
Expand Down Expand Up @@ -1718,7 +1717,7 @@ func ResetContextOfStmt(ctx sessionctx.Context, s ast.StmtNode) (err error) {
sc.MemTracker.SetActionOnExceed(action)
}
if execStmt, ok := s.(*ast.ExecuteStmt); ok {
prepareStmt, err := planner.GetPreparedStmt(execStmt, vars)
prepareStmt, err := plannercore.GetPreparedStmt(execStmt, vars)
if err != nil {
return err
}
Expand Down
7 changes: 7 additions & 0 deletions executor/prepared.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"time"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/log"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/infoschema"
Expand All @@ -31,6 +32,7 @@ import (
plannercore "github.com/pingcap/tidb/planner/core"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/sessiontxn"
"github.com/pingcap/tidb/types"
driver "github.com/pingcap/tidb/types/parser_driver"
"github.com/pingcap/tidb/util"
Expand Down Expand Up @@ -340,6 +342,11 @@ func CompileExecutePreparedStmt(ctx context.Context, sctx sessionctx.Context,
return nil, false, false, err
}

failpoint.Inject("assertTxnManagerInCompile", func() {
sessiontxn.RecordAssert(sctx, "assertTxnManagerInCompile", true)
sessiontxn.AssertTxnManagerInfoSchema(sctx, is)
})

stmt := &ExecStmt{
GoCtx: ctx,
InfoSchema: is,
Expand Down
3 changes: 2 additions & 1 deletion executor/seqtest/prepared_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,8 @@ func TestPrepared(t *testing.T) {
require.Equal(t, query, stmt.OriginText())

// Check that rebuild plan works.
tk.Session().PrepareTxnCtx(ctx)
err = tk.Session().PrepareTxnCtx(ctx)
require.NoError(t, err)
_, err = stmt.RebuildPlan(ctx)
require.NoError(t, err)
rs, err = stmt.Exec(ctx)
Expand Down
20 changes: 20 additions & 0 deletions planner/core/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"sync/atomic"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/parser"
"github.com/pingcap/tidb/parser/ast"
Expand Down Expand Up @@ -213,3 +214,22 @@ type CachedPrepareStmt struct {
ForUpdateRead bool
SnapshotTSEvaluator func(sessionctx.Context) (uint64, error)
}

// GetPreparedStmt extract the prepared statement from the execute statement.
func GetPreparedStmt(stmt *ast.ExecuteStmt, vars *variable.SessionVars) (*CachedPrepareStmt, error) {
var ok bool
execID := stmt.ExecID
if stmt.Name != "" {
if execID, ok = vars.PreparedStmtNameToID[stmt.Name]; !ok {
return nil, ErrStmtNotFound
}
}
if preparedPointer, ok := vars.PreparedStmts[execID]; ok {
preparedObj, ok := preparedPointer.(*CachedPrepareStmt)
if !ok {
return nil, errors.Errorf("invalid CachedPrepareStmt type")
}
return preparedObj, nil
}
return nil, ErrStmtNotFound
}
52 changes: 52 additions & 0 deletions planner/core/preprocess.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
"github.com/pingcap/tidb/privilege"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/sessiontxn"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/table/temptable"
"github.com/pingcap/tidb/types"
Expand All @@ -59,6 +60,11 @@ func InTxnRetry(p *preprocessor) {
p.flag |= inTxnRetry
}

// InitTxnContextProvider is a PreprocessOpt that indicates preprocess should init transaction's context
func InitTxnContextProvider(p *preprocessor) {
p.flag |= initTxnContextProvider
}

// WithPreprocessorReturn returns a PreprocessOpt to initialize the PreprocessorReturn.
func WithPreprocessorReturn(ret *PreprocessorReturn) PreprocessOpt {
return func(p *preprocessor) {
Expand Down Expand Up @@ -117,6 +123,9 @@ func Preprocess(ctx sessionctx.Context, node ast.Node, preprocessOpt ...Preproce
node.Accept(&v)
// InfoSchema must be non-nil after preprocessing
v.ensureInfoSchema()

v.initTxnContextProviderIfNecessary(node)

return errors.Trace(v.err)
}

Expand All @@ -136,6 +145,8 @@ const (
// inSequenceFunction is set when visiting a sequence function.
// This flag indicates the tableName in these function should be checked as sequence object.
inSequenceFunction
// initTxnContextProvider is set when we should init txn context in preprocess
initTxnContextProvider
)

// Make linter happy.
Expand Down Expand Up @@ -193,6 +204,9 @@ func (p *preprocessor) Enter(in ast.Node) (out ast.Node, skipChildren bool) {
// handle the insert table name imminently
// insert into t with t ..., the insert can not see t here. We should hand it before the CTE statement
p.handleTableName(node.Table.TableRefs.Left.(*ast.TableSource).Source.(*ast.TableName))
case *ast.ExecuteStmt:
p.stmtTp = TypeExecute
p.resolveExecuteStmt(node)
case *ast.CreateTableStmt:
p.stmtTp = TypeCreate
p.flag |= inCreateOrDropTable
Expand Down Expand Up @@ -361,6 +375,8 @@ const (
TypeRepair
// TypeShow for ShowStmt
TypeShow
// TypeExecute for ExecuteStmt
TypeExecute
)

func bindableStmtType(node ast.StmtNode) byte {
Expand Down Expand Up @@ -1489,6 +1505,32 @@ func (p *preprocessor) resolveShowStmt(node *ast.ShowStmt) {
}
}

func (p *preprocessor) resolveExecuteStmt(node *ast.ExecuteStmt) {
prepared, err := GetPreparedStmt(node, p.ctx.GetSessionVars())
if err != nil {
p.err = err
return
}

if prepared.SnapshotTSEvaluator != nil {
snapshotTS, err := prepared.SnapshotTSEvaluator(p.ctx)
if err != nil {
p.err = err
return
}

is, err := domain.GetDomain(p.ctx).GetSnapshotInfoSchema(snapshotTS)
if err != nil {
p.err = err
return
}

p.LastSnapshotTS = snapshotTS
p.initedLastSnapshotTS = true
p.InfoSchema = temptable.AttachLocalTemporaryTableInfoSchema(p.ctx, is)
}
}

func (p *preprocessor) resolveCreateTableStmt(node *ast.CreateTableStmt) {
for _, val := range node.Constraints {
if val.Refer != nil && val.Refer.Table.Schema.String() == "" {
Expand Down Expand Up @@ -1689,3 +1731,13 @@ func (p *preprocessor) ensureInfoSchema() infoschema.InfoSchema {
p.InfoSchema = p.ctx.GetInfoSchema().(infoschema.InfoSchema)
return p.InfoSchema
}

func (p *preprocessor) initTxnContextProviderIfNecessary(node ast.Node) {
if p.err != nil || p.flag&initTxnContextProvider == 0 {
return
}

p.err = sessiontxn.GetTxnManager(p.ctx).SetContextProvider(&sessiontxn.SimpleTxnContextProvider{
InfoSchema: p.ensureInfoSchema(),
})
}
21 changes: 1 addition & 20 deletions planner/optimize.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,29 +48,10 @@ import (
"go.uber.org/zap"
)

// GetPreparedStmt extract the prepared statement from the execute statement.
func GetPreparedStmt(stmt *ast.ExecuteStmt, vars *variable.SessionVars) (*plannercore.CachedPrepareStmt, error) {
var ok bool
execID := stmt.ExecID
if stmt.Name != "" {
if execID, ok = vars.PreparedStmtNameToID[stmt.Name]; !ok {
return nil, plannercore.ErrStmtNotFound
}
}
if preparedPointer, ok := vars.PreparedStmts[execID]; ok {
preparedObj, ok := preparedPointer.(*plannercore.CachedPrepareStmt)
if !ok {
return nil, errors.Errorf("invalid CachedPrepareStmt type")
}
return preparedObj, nil
}
return nil, plannercore.ErrStmtNotFound
}

// IsReadOnly check whether the ast.Node is a read only statement.
func IsReadOnly(node ast.Node, vars *variable.SessionVars) bool {
if execStmt, isExecStmt := node.(*ast.ExecuteStmt); isExecStmt {
prepareStmt, err := GetPreparedStmt(execStmt, vars)
prepareStmt, err := plannercore.GetPreparedStmt(execStmt, vars)
if err != nil {
logutil.BgLogger().Warn("GetPreparedStmt failed", zap.Error(err))
return false
Expand Down
Loading

0 comments on commit 529ce88

Please sign in to comment.