diff --git a/ddl/db_change_test.go b/ddl/db_change_test.go index 3ae2259ddaf07..1873cb02d9fe0 100644 --- a/ddl/db_change_test.go +++ b/ddl/db_change_test.go @@ -411,7 +411,9 @@ func (t *testExecInfo) compileSQL(idx int) (err error) { compiler := executor.Compiler{Ctx: c.session} se := c.session ctx := context.TODO() - se.PrepareTxnCtx(ctx) + if err = se.PrepareTxnCtx(ctx); err != nil { + return err + } sctx := se.(sessionctx.Context) if err = executor.ResetContextOfStmt(sctx, c.rawStmt); err != nil { return errors.Trace(err) diff --git a/executor/adapter.go b/executor/adapter.go index 39e660099ed3f..be83c42d99940 100644 --- a/executor/adapter.go +++ b/executor/adapter.go @@ -46,6 +46,7 @@ import ( "github.com/pingcap/tidb/plugin" "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/sessionctx/variable" + "github.com/pingcap/tidb/sessiontxn" "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util/chunk" "github.com/pingcap/tidb/util/execdetails" @@ -263,6 +264,12 @@ func (a *ExecStmt) PointGet(ctx context.Context, is infoschema.InfoSchema) (*rec a.PsStmt.Executor = newExecutor } pointExecutor := a.PsStmt.Executor.(*PointGetExecutor) + + failpoint.Inject("assertTxnManagerInShortPointGetPlan", func() { + sessiontxn.RecordAssert(a.Ctx, "assertTxnManagerInShortPointGetPlan", true) + sessiontxn.AssertTxnManagerInfoSchema(a.Ctx, is) + }) + if err = pointExecutor.Open(ctx); err != nil { terror.Call(pointExecutor.Close) return nil, err @@ -298,6 +305,16 @@ func (a *ExecStmt) RebuildPlan(ctx context.Context) (int64, error) { if err := plannercore.Preprocess(a.Ctx, a.StmtNode, plannercore.InTxnRetry, plannercore.WithPreprocessorReturn(ret)); err != nil { return 0, err } + + failpoint.Inject("assertTxnManagerInRebuildPlan", func() { + if is, ok := a.Ctx.Value(sessiontxn.AssertTxnInfoSchemaAfterRetryKey).(infoschema.InfoSchema); ok { + a.Ctx.SetValue(sessiontxn.AssertTxnInfoSchemaKey, is) + a.Ctx.SetValue(sessiontxn.AssertTxnInfoSchemaAfterRetryKey, nil) + } + sessiontxn.RecordAssert(a.Ctx, "assertTxnManagerInRebuildPlan", true) + sessiontxn.AssertTxnManagerInfoSchema(a.Ctx, ret.InfoSchema) + }) + a.InfoSchema = ret.InfoSchema a.SnapshotTS = ret.LastSnapshotTS a.IsStaleness = ret.IsStaleness @@ -755,6 +772,10 @@ func (a *ExecStmt) handlePessimisticLockError(ctx context.Context, err error) (E a.Ctx.GetSessionVars().StmtCtx.ResetForRetry() a.Ctx.GetSessionVars().RetryInfo.ResetOffset() + failpoint.Inject("assertTxnManagerAfterPessimisticLockErrorRetry", func() { + sessiontxn.RecordAssert(a.Ctx, "assertTxnManagerAfterPessimisticLockErrorRetry", true) + }) + if err = e.Open(ctx); err != nil { return nil, err } @@ -809,6 +830,11 @@ func (a *ExecStmt) buildExecutor() (Executor, error) { return nil, errors.Trace(b.err) } + failpoint.Inject("assertTxnManagerAfterBuildExecutor", func() { + sessiontxn.RecordAssert(a.Ctx, "assertTxnManagerAfterBuildExecutor", true) + sessiontxn.AssertTxnManagerInfoSchema(b.ctx, b.is) + }) + // ExecuteExec is not a real Executor, we only use it to build another Executor from a prepared statement. if executorExec, ok := e.(*ExecuteExec); ok { err := executorExec.Build(b) diff --git a/executor/compiler.go b/executor/compiler.go index 74a878b4d3293..baf49979572c2 100644 --- a/executor/compiler.go +++ b/executor/compiler.go @@ -27,6 +27,7 @@ import ( "github.com/pingcap/tidb/planner" plannercore "github.com/pingcap/tidb/planner/core" "github.com/pingcap/tidb/sessionctx" + "github.com/pingcap/tidb/sessiontxn" ) var ( @@ -57,11 +58,21 @@ func (c *Compiler) Compile(ctx context.Context, stmtNode ast.StmtNode) (*ExecStm ret := &plannercore.PreprocessorReturn{} pe := &plannercore.PreprocessExecuteISUpdate{ExecuteInfoSchemaUpdate: planner.GetExecuteForUpdateReadIS, Node: stmtNode} - err := plannercore.Preprocess(c.Ctx, stmtNode, plannercore.WithPreprocessorReturn(ret), plannercore.WithExecuteInfoSchemaUpdate(pe)) + err := plannercore.Preprocess(c.Ctx, + stmtNode, + plannercore.WithPreprocessorReturn(ret), + plannercore.WithExecuteInfoSchemaUpdate(pe), + plannercore.InitTxnContextProvider, + ) if err != nil { return nil, err } + failpoint.Inject("assertTxnManagerInCompile", func() { + sessiontxn.RecordAssert(c.Ctx, "assertTxnManagerInCompile", true) + sessiontxn.AssertTxnManagerInfoSchema(c.Ctx, ret.InfoSchema) + }) + finalPlan, names, err := planner.Optimize(ctx, c.Ctx, stmtNode, ret.InfoSchema) if err != nil { return nil, err diff --git a/executor/executor.go b/executor/executor.go index 4338529cea8ac..af46ffc2b999f 100644 --- a/executor/executor.go +++ b/executor/executor.go @@ -45,7 +45,6 @@ import ( "github.com/pingcap/tidb/parser/model" "github.com/pingcap/tidb/parser/mysql" "github.com/pingcap/tidb/parser/terror" - "github.com/pingcap/tidb/planner" plannercore "github.com/pingcap/tidb/planner/core" "github.com/pingcap/tidb/privilege" "github.com/pingcap/tidb/sessionctx" @@ -1718,7 +1717,7 @@ func ResetContextOfStmt(ctx sessionctx.Context, s ast.StmtNode) (err error) { sc.MemTracker.SetActionOnExceed(action) } if execStmt, ok := s.(*ast.ExecuteStmt); ok { - prepareStmt, err := planner.GetPreparedStmt(execStmt, vars) + prepareStmt, err := plannercore.GetPreparedStmt(execStmt, vars) if err != nil { return err } diff --git a/executor/prepared.go b/executor/prepared.go index 82a030e76b6c1..3b703c75a9cf5 100644 --- a/executor/prepared.go +++ b/executor/prepared.go @@ -21,6 +21,7 @@ import ( "time" "github.com/pingcap/errors" + "github.com/pingcap/failpoint" "github.com/pingcap/log" "github.com/pingcap/tidb/expression" "github.com/pingcap/tidb/infoschema" @@ -31,6 +32,7 @@ import ( plannercore "github.com/pingcap/tidb/planner/core" "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/sessionctx/variable" + "github.com/pingcap/tidb/sessiontxn" "github.com/pingcap/tidb/types" driver "github.com/pingcap/tidb/types/parser_driver" "github.com/pingcap/tidb/util" @@ -340,6 +342,11 @@ func CompileExecutePreparedStmt(ctx context.Context, sctx sessionctx.Context, return nil, false, false, err } + failpoint.Inject("assertTxnManagerInCompile", func() { + sessiontxn.RecordAssert(sctx, "assertTxnManagerInCompile", true) + sessiontxn.AssertTxnManagerInfoSchema(sctx, is) + }) + stmt := &ExecStmt{ GoCtx: ctx, InfoSchema: is, diff --git a/executor/seqtest/prepared_test.go b/executor/seqtest/prepared_test.go index 42fbf72164e54..a5aafc326d0c5 100644 --- a/executor/seqtest/prepared_test.go +++ b/executor/seqtest/prepared_test.go @@ -163,7 +163,8 @@ func TestPrepared(t *testing.T) { require.Equal(t, query, stmt.OriginText()) // Check that rebuild plan works. - tk.Session().PrepareTxnCtx(ctx) + err = tk.Session().PrepareTxnCtx(ctx) + require.NoError(t, err) _, err = stmt.RebuildPlan(ctx) require.NoError(t, err) rs, err = stmt.Exec(ctx) diff --git a/planner/core/cache.go b/planner/core/cache.go index a386c4a5a3649..4113f3e911e88 100644 --- a/planner/core/cache.go +++ b/planner/core/cache.go @@ -19,6 +19,7 @@ import ( "sync/atomic" "time" + "github.com/pingcap/errors" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/parser" "github.com/pingcap/tidb/parser/ast" @@ -213,3 +214,22 @@ type CachedPrepareStmt struct { ForUpdateRead bool SnapshotTSEvaluator func(sessionctx.Context) (uint64, error) } + +// GetPreparedStmt extract the prepared statement from the execute statement. +func GetPreparedStmt(stmt *ast.ExecuteStmt, vars *variable.SessionVars) (*CachedPrepareStmt, error) { + var ok bool + execID := stmt.ExecID + if stmt.Name != "" { + if execID, ok = vars.PreparedStmtNameToID[stmt.Name]; !ok { + return nil, ErrStmtNotFound + } + } + if preparedPointer, ok := vars.PreparedStmts[execID]; ok { + preparedObj, ok := preparedPointer.(*CachedPrepareStmt) + if !ok { + return nil, errors.Errorf("invalid CachedPrepareStmt type") + } + return preparedObj, nil + } + return nil, ErrStmtNotFound +} diff --git a/planner/core/preprocess.go b/planner/core/preprocess.go index 101e2a0c53479..6e2f9aaed4f53 100644 --- a/planner/core/preprocess.go +++ b/planner/core/preprocess.go @@ -37,6 +37,7 @@ import ( "github.com/pingcap/tidb/privilege" "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/sessionctx/variable" + "github.com/pingcap/tidb/sessiontxn" "github.com/pingcap/tidb/table" "github.com/pingcap/tidb/table/temptable" "github.com/pingcap/tidb/types" @@ -59,6 +60,11 @@ func InTxnRetry(p *preprocessor) { p.flag |= inTxnRetry } +// InitTxnContextProvider is a PreprocessOpt that indicates preprocess should init transaction's context +func InitTxnContextProvider(p *preprocessor) { + p.flag |= initTxnContextProvider +} + // WithPreprocessorReturn returns a PreprocessOpt to initialize the PreprocessorReturn. func WithPreprocessorReturn(ret *PreprocessorReturn) PreprocessOpt { return func(p *preprocessor) { @@ -117,6 +123,9 @@ func Preprocess(ctx sessionctx.Context, node ast.Node, preprocessOpt ...Preproce node.Accept(&v) // InfoSchema must be non-nil after preprocessing v.ensureInfoSchema() + + v.initTxnContextProviderIfNecessary(node) + return errors.Trace(v.err) } @@ -136,6 +145,8 @@ const ( // inSequenceFunction is set when visiting a sequence function. // This flag indicates the tableName in these function should be checked as sequence object. inSequenceFunction + // initTxnContextProvider is set when we should init txn context in preprocess + initTxnContextProvider ) // Make linter happy. @@ -193,6 +204,9 @@ func (p *preprocessor) Enter(in ast.Node) (out ast.Node, skipChildren bool) { // handle the insert table name imminently // insert into t with t ..., the insert can not see t here. We should hand it before the CTE statement p.handleTableName(node.Table.TableRefs.Left.(*ast.TableSource).Source.(*ast.TableName)) + case *ast.ExecuteStmt: + p.stmtTp = TypeExecute + p.resolveExecuteStmt(node) case *ast.CreateTableStmt: p.stmtTp = TypeCreate p.flag |= inCreateOrDropTable @@ -361,6 +375,8 @@ const ( TypeRepair // TypeShow for ShowStmt TypeShow + // TypeExecute for ExecuteStmt + TypeExecute ) func bindableStmtType(node ast.StmtNode) byte { @@ -1489,6 +1505,32 @@ func (p *preprocessor) resolveShowStmt(node *ast.ShowStmt) { } } +func (p *preprocessor) resolveExecuteStmt(node *ast.ExecuteStmt) { + prepared, err := GetPreparedStmt(node, p.ctx.GetSessionVars()) + if err != nil { + p.err = err + return + } + + if prepared.SnapshotTSEvaluator != nil { + snapshotTS, err := prepared.SnapshotTSEvaluator(p.ctx) + if err != nil { + p.err = err + return + } + + is, err := domain.GetDomain(p.ctx).GetSnapshotInfoSchema(snapshotTS) + if err != nil { + p.err = err + return + } + + p.LastSnapshotTS = snapshotTS + p.initedLastSnapshotTS = true + p.InfoSchema = temptable.AttachLocalTemporaryTableInfoSchema(p.ctx, is) + } +} + func (p *preprocessor) resolveCreateTableStmt(node *ast.CreateTableStmt) { for _, val := range node.Constraints { if val.Refer != nil && val.Refer.Table.Schema.String() == "" { @@ -1689,3 +1731,13 @@ func (p *preprocessor) ensureInfoSchema() infoschema.InfoSchema { p.InfoSchema = p.ctx.GetInfoSchema().(infoschema.InfoSchema) return p.InfoSchema } + +func (p *preprocessor) initTxnContextProviderIfNecessary(node ast.Node) { + if p.err != nil || p.flag&initTxnContextProvider == 0 { + return + } + + p.err = sessiontxn.GetTxnManager(p.ctx).SetContextProvider(&sessiontxn.SimpleTxnContextProvider{ + InfoSchema: p.ensureInfoSchema(), + }) +} diff --git a/planner/optimize.go b/planner/optimize.go index b16fc09a238f0..e5d82a95d6e49 100644 --- a/planner/optimize.go +++ b/planner/optimize.go @@ -48,29 +48,10 @@ import ( "go.uber.org/zap" ) -// GetPreparedStmt extract the prepared statement from the execute statement. -func GetPreparedStmt(stmt *ast.ExecuteStmt, vars *variable.SessionVars) (*plannercore.CachedPrepareStmt, error) { - var ok bool - execID := stmt.ExecID - if stmt.Name != "" { - if execID, ok = vars.PreparedStmtNameToID[stmt.Name]; !ok { - return nil, plannercore.ErrStmtNotFound - } - } - if preparedPointer, ok := vars.PreparedStmts[execID]; ok { - preparedObj, ok := preparedPointer.(*plannercore.CachedPrepareStmt) - if !ok { - return nil, errors.Errorf("invalid CachedPrepareStmt type") - } - return preparedObj, nil - } - return nil, plannercore.ErrStmtNotFound -} - // IsReadOnly check whether the ast.Node is a read only statement. func IsReadOnly(node ast.Node, vars *variable.SessionVars) bool { if execStmt, isExecStmt := node.(*ast.ExecuteStmt); isExecStmt { - prepareStmt, err := GetPreparedStmt(execStmt, vars) + prepareStmt, err := plannercore.GetPreparedStmt(execStmt, vars) if err != nil { logutil.BgLogger().Warn("GetPreparedStmt failed", zap.Error(err)) return false diff --git a/session/session.go b/session/session.go index 561120ff4b9cc..cd1ab05da5498 100644 --- a/session/session.go +++ b/session/session.go @@ -44,6 +44,7 @@ import ( "github.com/pingcap/tidb/parser/model" "github.com/pingcap/tidb/parser/mysql" "github.com/pingcap/tidb/parser/terror" + "github.com/pingcap/tidb/sessiontxn" "github.com/pingcap/tidb/store/driver/txn" "github.com/pingcap/tidb/table/tables" "github.com/pingcap/tidb/table/temptable" @@ -153,7 +154,7 @@ type Session interface { // Return the information of the txn current running TxnInfo() *txninfo.TxnInfo // PrepareTxnCtx is exported for test. - PrepareTxnCtx(context.Context) + PrepareTxnCtx(context.Context) error // FieldList returns fields list of a table. FieldList(tableName string) (fields []*ast.ResultField, err error) SetPort(port string) @@ -997,7 +998,9 @@ func (s *session) retry(ctx context.Context, maxCnt uint) (err error) { orgStartTS := sessVars.TxnCtx.StartTS label := s.GetSQLLabel() for { - s.PrepareTxnCtx(ctx) + if err = s.PrepareTxnCtx(ctx); err != nil { + return err + } s.sessionVars.RetryInfo.ResetOffset() for i, sr := range nh.history { st := sr.st @@ -1634,7 +1637,10 @@ func (s *session) ExecuteStmt(ctx context.Context, stmtNode ast.StmtNode) (sqlex ctx = opentracing.ContextWithSpan(ctx, span1) } - s.PrepareTxnCtx(ctx) + if err := s.PrepareTxnCtx(ctx); err != nil { + return nil, err + } + if err := s.loadCommonGlobalVariablesIfNeeded(); err != nil { return nil, err } @@ -1772,6 +1778,13 @@ func (s *session) hasQuerySpecial() bool { // runStmt executes the sqlexec.Statement and commit or rollback the current transaction. func runStmt(ctx context.Context, se *session, s sqlexec.Statement) (rs sqlexec.RecordSet, err error) { + failpoint.Inject("assertTxnManagerInRunStmt", func() { + sessiontxn.RecordAssert(se, "assertTxnManagerInRunStmt", true) + if stmt, ok := s.(*executor.ExecStmt); ok { + sessiontxn.AssertTxnManagerInfoSchema(se, stmt.InfoSchema) + } + }) + if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil { span1 := span.Tracer().StartSpan("session.runStmt", opentracing.ChildOf(span.Context())) span1.LogKV("sql", s.OriginText()) @@ -1912,7 +1925,9 @@ func (s *session) PrepareStmt(sql string) (stmtID uint32, paramCount int, fields inTxn := s.GetSessionVars().InTxn() // NewPrepareExec may need startTS to build the executor, for example prepare statement has subquery in int. // So we have to call PrepareTxnCtx here. - s.PrepareTxnCtx(ctx) + if err = s.PrepareTxnCtx(ctx); err != nil { + return + } s.PrepareTSFuture(ctx) prepareExec := executor.NewPrepareExec(s, sql) err = prepareExec.Next(ctx, nil) @@ -1929,6 +1944,12 @@ func (s *session) PrepareStmt(sql string) (stmtID uint32, paramCount int, fields func (s *session) preparedStmtExec(ctx context.Context, is infoschema.InfoSchema, snapshotTS uint64, stmtID uint32, prepareStmt *plannercore.CachedPrepareStmt, args []types.Datum) (sqlexec.RecordSet, error) { + + failpoint.Inject("assertTxnManagerInPreparedStmtExec", func() { + sessiontxn.RecordAssert(s, "assertTxnManagerInPreparedStmtExec", true) + sessiontxn.AssertTxnManagerInfoSchema(s, is) + }) + st, tiFlashPushDown, tiFlashExchangePushDown, err := executor.CompileExecutePreparedStmt(ctx, s, stmtID, is, snapshotTS, args) if err != nil { return nil, err @@ -1951,6 +1972,12 @@ func (s *session) preparedStmtExec(ctx context.Context, func (s *session) cachedPlanExec(ctx context.Context, is infoschema.InfoSchema, snapshotTS uint64, stmtID uint32, prepareStmt *plannercore.CachedPrepareStmt, args []types.Datum) (sqlexec.RecordSet, error) { + + failpoint.Inject("assertTxnManagerInCachedPlanExec", func() { + sessiontxn.RecordAssert(s, "assertTxnManagerInCachedPlanExec", true) + sessiontxn.AssertTxnManagerInfoSchema(s, is) + }) + prepared := prepareStmt.PreparedAst // compile ExecStmt execAst := &ast.ExecuteStmt{ExecID: stmtID} @@ -2066,8 +2093,11 @@ func (s *session) IsCachedExecOk(ctx context.Context, preparedStmt *plannercore. // ExecutePreparedStmt executes a prepared statement. func (s *session) ExecutePreparedStmt(ctx context.Context, stmtID uint32, args []types.Datum) (sqlexec.RecordSet, error) { - s.PrepareTxnCtx(ctx) var err error + if err = s.PrepareTxnCtx(ctx); err != nil { + return nil, err + } + s.sessionVars.StartTime = time.Now() preparedPointer, ok := s.sessionVars.PreparedStmts[stmtID] if !ok { @@ -2079,13 +2109,7 @@ func (s *session) ExecutePreparedStmt(ctx context.Context, stmtID uint32, args [ if !ok { return nil, errors.Errorf("invalid CachedPrepareStmt type") } - executor.CountStmtNode(preparedStmt.PreparedAst.Stmt, s.sessionVars.InRestrictedSQL) - ok, err = s.IsCachedExecOk(ctx, preparedStmt) - if err != nil { - return nil, err - } - s.txn.onStmtStart(preparedStmt.SQLDigest.String()) - defer s.txn.onStmtEnd() + var is infoschema.InfoSchema var snapshotTS uint64 if preparedStmt.ForUpdateRead { @@ -2102,6 +2126,22 @@ func (s *session) ExecutePreparedStmt(ctx context.Context, stmtID uint32, args [ } else { is = s.GetInfoSchema().(infoschema.InfoSchema) } + + txnCtxProvider := &sessiontxn.SimpleTxnContextProvider{ + InfoSchema: is, + } + if err = sessiontxn.GetTxnManager(s).SetContextProvider(txnCtxProvider); err != nil { + return nil, err + } + + executor.CountStmtNode(preparedStmt.PreparedAst.Stmt, s.sessionVars.InRestrictedSQL) + ok, err = s.IsCachedExecOk(ctx, preparedStmt) + if err != nil { + return nil, err + } + s.txn.onStmtStart(preparedStmt.SQLDigest.String()) + defer s.txn.onStmtEnd() + if ok { return s.cachedPlanExec(ctx, is, snapshotTS, stmtID, preparedStmt, args) } @@ -2843,10 +2883,10 @@ func (s *session) loadCommonGlobalVariablesIfNeeded() error { // PrepareTxnCtx starts a goroutine to begin a transaction if needed, and creates a new transaction context. // It is called before we execute a sql query. -func (s *session) PrepareTxnCtx(ctx context.Context) { +func (s *session) PrepareTxnCtx(ctx context.Context) error { s.currentCtx = ctx if s.txn.validOrPending() { - return + return nil } is := s.GetInfoSchema() @@ -2861,6 +2901,11 @@ func (s *session) PrepareTxnCtx(ctx context.Context) { s.sessionVars.TxnCtx.IsPessimistic = true } } + + txnCtxProvider := &sessiontxn.SimpleTxnContextProvider{ + InfoSchema: is.(infoschema.InfoSchema), + } + return sessiontxn.GetTxnManager(s).SetContextProvider(txnCtxProvider) } // PrepareTSFuture uses to try to get ts future. diff --git a/session/session_test.go b/session/session_test.go index 4602758eeaa28..e70660f172111 100644 --- a/session/session_test.go +++ b/session/session_test.go @@ -3058,7 +3058,8 @@ func (s *testSchemaSuite) TestDisableTxnAutoRetry(c *C) { // session 1 starts a transaction early. // execute a select statement to clear retry history. tk1.MustExec("select 1") - tk1.Se.PrepareTxnCtx(context.Background()) + err = tk1.Se.PrepareTxnCtx(context.Background()) + c.Assert(err, IsNil) // session 2 update the value. tk2.MustExec("update no_retry set id = 4") // AutoCommit update will retry, so it would not fail. diff --git a/session/txnmanager.go b/session/txnmanager.go new file mode 100644 index 0000000000000..0e47a8cf7406a --- /dev/null +++ b/session/txnmanager.go @@ -0,0 +1,62 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package session + +import ( + "github.com/pingcap/tidb/infoschema" + "github.com/pingcap/tidb/sessionctx" + "github.com/pingcap/tidb/sessiontxn" +) + +func init() { + sessiontxn.GetTxnManager = getTxnManager +} + +func getTxnManager(sctx sessionctx.Context) sessiontxn.TxnManager { + if manager, ok := sctx.GetSessionVars().TxnManager.(sessiontxn.TxnManager); ok { + return manager + } + + manager := newTxnManager(sctx) + sctx.GetSessionVars().TxnManager = manager + return manager +} + +// txnManager implements sessiontxn.TxnManager +type txnManager struct { + sctx sessionctx.Context + + ctxProvider sessiontxn.TxnContextProvider +} + +func newTxnManager(sctx sessionctx.Context) *txnManager { + return &txnManager{sctx: sctx} +} + +func (m *txnManager) GetTxnInfoSchema() infoschema.InfoSchema { + if m.ctxProvider == nil { + return nil + } + return m.ctxProvider.GetTxnInfoSchema() +} + +func (m *txnManager) GetContextProvider() sessiontxn.TxnContextProvider { + return m.ctxProvider +} + +func (m *txnManager) SetContextProvider(provider sessiontxn.TxnContextProvider) error { + m.ctxProvider = provider + return nil +} diff --git a/sessionctx/context.go b/sessionctx/context.go index 2f9a50aa211f6..bc2237e5b9987 100644 --- a/sessionctx/context.go +++ b/sessionctx/context.go @@ -70,6 +70,7 @@ type Context interface { // ClearValue clears the value associated with this context for key. ClearValue(key fmt.Stringer) + // Deprecated: Use TxnManager.GetTxnInfoSchema to get the current schema in session GetInfoSchema() InfoschemaMetaVersion GetSessionVars() *variable.SessionVars diff --git a/sessionctx/variable/session.go b/sessionctx/variable/session.go index d03cbdce86fde..3b0c8f33402e9 100644 --- a/sessionctx/variable/session.go +++ b/sessionctx/variable/session.go @@ -474,6 +474,9 @@ type SessionVars struct { // TxnCtx Should be reset on transaction finished. TxnCtx *TransactionContext + // TxnManager is used to manage txn context in session + TxnManager interface{} + // KVVars is the variables for KV storage. KVVars *tikvstore.Variables diff --git a/sessiontxn/failpoint.go b/sessiontxn/failpoint.go new file mode 100644 index 0000000000000..1d0a832de1083 --- /dev/null +++ b/sessiontxn/failpoint.go @@ -0,0 +1,74 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package sessiontxn + +import ( + "fmt" + + "github.com/pingcap/tidb/infoschema" + "github.com/pingcap/tidb/sessionctx" + "github.com/pingcap/tidb/util/stringutil" +) + +// AssertRecordsKey is used to save failPoint invoke records +// Only for test +var AssertRecordsKey stringutil.StringerStr = "assertTxnManagerRecords" + +// AssertTxnInfoSchemaKey is used to set the expected infoschema that should be check in failPoint +// Only for test +var AssertTxnInfoSchemaKey stringutil.StringerStr = "assertTxnInfoSchemaKey" + +// AssertTxnInfoSchemaAfterRetryKey is used to set the expected infoschema that should be check in failPoint after retry +// Only for test +var AssertTxnInfoSchemaAfterRetryKey stringutil.StringerStr = "assertTxnInfoSchemaAfterRetryKey" + +// RecordAssert is used only for test +func RecordAssert(sctx sessionctx.Context, name string, value interface{}) { + records, ok := sctx.Value(AssertRecordsKey).(map[string]interface{}) + if !ok { + records = make(map[string]interface{}) + sctx.SetValue(AssertRecordsKey, records) + } + records[name] = value +} + +// AssertTxnManagerInfoSchema is used only for test +func AssertTxnManagerInfoSchema(sctx sessionctx.Context, is interface{}) { + assertVersion := func(expected interface{}) { + if expected == nil { + return + } + + expectVer := expected.(infoschema.InfoSchema).SchemaMetaVersion() + gotVer := GetTxnManager(sctx).GetTxnInfoSchema().SchemaMetaVersion() + if gotVer != expectVer { + panic(fmt.Sprintf("Txn schema version not match, expect:%d, got:%d", expectVer, gotVer)) + } + } + + if localTables := sctx.GetSessionVars().LocalTemporaryTables; localTables != nil { + got, ok := GetTxnManager(sctx).GetTxnInfoSchema().(*infoschema.TemporaryTableAttachedInfoSchema) + if !ok { + panic("Expected to be a TemporaryTableAttachedInfoSchema") + } + + if got.LocalTemporaryTables != localTables { + panic("Local tables should be the same with the one in session") + } + } + + assertVersion(is) + assertVersion(sctx.Value(AssertTxnInfoSchemaKey)) +} diff --git a/sessiontxn/interface.go b/sessiontxn/interface.go new file mode 100644 index 0000000000000..fc7357ad10d55 --- /dev/null +++ b/sessiontxn/interface.go @@ -0,0 +1,59 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package sessiontxn + +import ( + "github.com/pingcap/tidb/infoschema" + "github.com/pingcap/tidb/sessionctx" +) + +// TxnContextProvider provides txn context +type TxnContextProvider interface { + // Initialize the provider with session context + Initialize(sctx sessionctx.Context) error + // GetTxnInfoSchema returns the information schema used by txn + GetTxnInfoSchema() infoschema.InfoSchema +} + +// SimpleTxnContextProvider implements TxnContextProvider +// It is only used in refactor stage +// TODO: remove it after refactor finished +type SimpleTxnContextProvider struct { + InfoSchema infoschema.InfoSchema +} + +// Initialize the provider with session context +func (p *SimpleTxnContextProvider) Initialize(_ sessionctx.Context) error { + return nil +} + +// GetTxnInfoSchema returns the information schema used by txn +func (p *SimpleTxnContextProvider) GetTxnInfoSchema() infoschema.InfoSchema { + return p.InfoSchema +} + +// TxnManager is an interface providing txn context management in session +type TxnManager interface { + // GetTxnInfoSchema returns the information schema used by txn + GetTxnInfoSchema() infoschema.InfoSchema + + // GetContextProvider returns the current TxnContextProvider + GetContextProvider() TxnContextProvider + // SetContextProvider sets the context provider + SetContextProvider(provider TxnContextProvider) error +} + +// GetTxnManager returns the TxnManager object from session context +var GetTxnManager func(sctx sessionctx.Context) TxnManager diff --git a/sessiontxn/txn_context_serial_test.go b/sessiontxn/txn_context_serial_test.go new file mode 100644 index 0000000000000..099af86a0e689 --- /dev/null +++ b/sessiontxn/txn_context_serial_test.go @@ -0,0 +1,706 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package sessiontxn_test + +import ( + "context" + "fmt" + "testing" + "time" + + "github.com/pingcap/failpoint" + "github.com/pingcap/tidb/domain" + "github.com/pingcap/tidb/kv" + "github.com/pingcap/tidb/sessionctx" + "github.com/pingcap/tidb/sessiontxn" + "github.com/pingcap/tidb/testkit" + "github.com/pingcap/tidb/util/testbridge" + "github.com/stretchr/testify/require" + "go.uber.org/goleak" +) + +func TestMain(m *testing.M) { + testbridge.WorkaroundGoCheckFlags() + opts := []goleak.Option{ + goleak.IgnoreTopFunction("go.etcd.io/etcd/pkg/logutil.(*MergeLogger).outputLoop"), + goleak.IgnoreTopFunction("go.opencensus.io/stats/view.(*worker).start"), + } + goleak.VerifyTestMain(m, opts...) +} + +func setupTxnContextTest(t *testing.T) (kv.Storage, *domain.Domain, func()) { + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/executor/assertTxnManagerInCompile", "return")) + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/executor/assertTxnManagerInRebuildPlan", "return")) + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/executor/assertTxnManagerAfterBuildExecutor", "return")) + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/executor/assertTxnManagerAfterPessimisticLockErrorRetry", "return")) + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/executor/assertTxnManagerInShortPointGetPlan", "return")) + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/session/assertTxnManagerInRunStmt", "return")) + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/session/assertTxnManagerInPreparedStmtExec", "return")) + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/session/assertTxnManagerInCachedPlanExec", "return")) + + store, do, clean := testkit.CreateMockStoreAndDomain(t) + + tk := testkit.NewTestKit(t, store) + tk.Session().SetValue(sessiontxn.AssertRecordsKey, nil) + tk.Session().SetValue(sessiontxn.AssertTxnInfoSchemaKey, nil) + + tk.MustExec("use test") + tk.MustExec("drop table if exists t1,t2") + + tk.MustExec("create table t1 (id int primary key, v int)") + tk.MustExec("insert into t1 values(1, 10)") + + tk.MustExec("create table t2 (id int)") + + tk.MustExec("create temporary table tmp (id int)") + tk.MustExec("insert into tmp values(10)") + + return store, do, func() { + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/executor/assertTxnManagerInCompile")) + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/executor/assertTxnManagerInRebuildPlan")) + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/executor/assertTxnManagerAfterBuildExecutor")) + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/executor/assertTxnManagerAfterPessimisticLockErrorRetry")) + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/executor/assertTxnManagerInShortPointGetPlan")) + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/session/assertTxnManagerInRunStmt")) + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/session/assertTxnManagerInPreparedStmtExec")) + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/session/assertTxnManagerInCachedPlanExec")) + + tk.Session().SetValue(sessiontxn.AssertRecordsKey, nil) + tk.Session().SetValue(sessiontxn.AssertTxnInfoSchemaKey, nil) + tk.Session().SetValue(sessiontxn.AssertTxnInfoSchemaAfterRetryKey, nil) + clean() + } +} + +func checkAssertRecordExits(t *testing.T, se sessionctx.Context, name string) { + records, ok := se.Value(sessiontxn.AssertRecordsKey).(map[string]interface{}) + require.True(t, ok, fmt.Sprintf("'%s' not in record, maybe failpoint not enabled?", name)) + _, ok = records[name] + require.True(t, ok, fmt.Sprintf("'%s' not in record", name)) +} + +func doWithCheckPath(t *testing.T, se sessionctx.Context, names []string, do func()) { + se.SetValue(sessiontxn.AssertRecordsKey, nil) + do() + for _, name := range names { + checkAssertRecordExits(t, se, name) + } +} + +var normalPathRecords = []string{ + "assertTxnManagerInCompile", + "assertTxnManagerInRunStmt", + "assertTxnManagerAfterBuildExecutor", +} + +func TestTxnContextForSimpleCases(t *testing.T) { + store, do, deferFunc := setupTxnContextTest(t) + defer deferFunc() + + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + se := tk.Session() + + tk2 := testkit.NewTestKit(t, store) + tk2.MustExec("use test") + + is1 := do.InfoSchema() + se.SetValue(sessiontxn.AssertTxnInfoSchemaKey, is1) + // test for write + doWithCheckPath(t, se, normalPathRecords, func() { + tk.MustExec("insert into t2 (id) values(3)") + }) + // test for select + doWithCheckPath(t, se, normalPathRecords, func() { + tk.MustQuery("select * from t1 where id=1").Check(testkit.Rows("1 10")) + }) + // test for select for update + doWithCheckPath(t, se, normalPathRecords, func() { + tk.MustQuery("select * from t1 where id=1 for update").Check(testkit.Rows("1 10")) + }) + + tk2.MustExec("alter table t2 add column(c1 int)") + is2 := do.InfoSchema() + require.True(t, is2.SchemaMetaVersion() > is1.SchemaMetaVersion()) + + se.SetValue(sessiontxn.AssertTxnInfoSchemaKey, is2) + doWithCheckPath(t, se, normalPathRecords, func() { + tk.MustQuery("select * from t1 where id=1").Check(testkit.Rows("1 10")) + }) +} + +func TestTxnContextInExplicitTxn(t *testing.T) { + store, do, deferFunc := setupTxnContextTest(t) + defer deferFunc() + + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + se := tk.Session() + + tk2 := testkit.NewTestKit(t, store) + tk2.MustExec("use test") + + is1 := do.InfoSchema() + se.SetValue(sessiontxn.AssertTxnInfoSchemaKey, is1) + + tk.MustExec("begin") + // test for write + doWithCheckPath(t, se, normalPathRecords, func() { + tk.MustExec("insert into t2 (id) values(2)") + }) + // test for select + doWithCheckPath(t, se, normalPathRecords, func() { + tk.MustQuery("select * from t1 where id=1").Check(testkit.Rows("1 10")) + }) + // test for select for update + doWithCheckPath(t, se, normalPathRecords, func() { + tk.MustQuery("select * from t1 where id=1 for update").Check(testkit.Rows("1 10")) + }) + + // info schema changed when txn not finish, the info schema in old txn should not change + tk2.MustExec("alter table t2 add column(c1 int)") + is2 := do.InfoSchema() + require.True(t, is2.SchemaMetaVersion() > is1.SchemaMetaVersion()) + + // test for write + doWithCheckPath(t, se, normalPathRecords, func() { + tk.MustExec("insert into t2 (id) values(2)") + }) + // test for select + doWithCheckPath(t, se, normalPathRecords, func() { + tk.MustQuery("select * from t1 where id=1").Check(testkit.Rows("1 10")) + }) + // test for select for update + doWithCheckPath(t, se, normalPathRecords, func() { + tk.MustQuery("select * from t1 where id=1 for update").Check(testkit.Rows("1 10")) + }) + + doWithCheckPath(t, se, normalPathRecords, func() { + tk.MustExec("commit") + }) + + // the info schema in new txn should use the newest one + se.SetValue(sessiontxn.AssertTxnInfoSchemaKey, is2) + tk.MustExec("begin") + // test for write + doWithCheckPath(t, se, normalPathRecords, func() { + tk.MustExec("insert into t2 (id) values(2)") + }) + // test for select + doWithCheckPath(t, se, normalPathRecords, func() { + tk.MustQuery("select * from t1 where id=1").Check(testkit.Rows("1 10")) + }) + // test for select for update + doWithCheckPath(t, se, normalPathRecords, func() { + tk.MustQuery("select * from t1 where id=1 for update").Check(testkit.Rows("1 10")) + }) +} + +func TestTxnContextBeginInUnfinishedTxn(t *testing.T) { + store, do, deferFunc := setupTxnContextTest(t) + defer deferFunc() + + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + se := tk.Session() + + tk2 := testkit.NewTestKit(t, store) + tk2.MustExec("use test") + + is1 := do.InfoSchema() + se.SetValue(sessiontxn.AssertTxnInfoSchemaKey, is1) + tk.MustExec("begin") + + doWithCheckPath(t, se, normalPathRecords, func() { + tk.MustQuery("select * from t1 where id=1").Check(testkit.Rows("1 10")) + }) + + tk2.MustExec("alter table t2 add column(c1 int)") + is2 := do.InfoSchema() + require.True(t, is2.SchemaMetaVersion() > is1.SchemaMetaVersion()) + + doWithCheckPath(t, se, normalPathRecords, func() { + tk.MustQuery("select * from t1 where id=1").Check(testkit.Rows("1 10")) + }) + + tk.MustExec("begin") + se.SetValue(sessiontxn.AssertTxnInfoSchemaKey, is2) + doWithCheckPath(t, se, normalPathRecords, func() { + tk.MustQuery("select * from t1 where id=1").Check(testkit.Rows("1 10")) + }) + tk.MustExec("rollback") +} + +func TestTxnContextWithAutocommitFalse(t *testing.T) { + store, do, deferFunc := setupTxnContextTest(t) + defer deferFunc() + + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + se := tk.Session() + + tk2 := testkit.NewTestKit(t, store) + tk2.MustExec("use test") + + is1 := do.InfoSchema() + se.SetValue(sessiontxn.AssertTxnInfoSchemaKey, is1) + tk.MustExec("begin") + + tk.MustExec("set autocommit=0") + se.SetValue(sessiontxn.AssertTxnInfoSchemaKey, do.InfoSchema()) + // test for write + doWithCheckPath(t, se, normalPathRecords, func() { + tk.MustExec("insert into t2 (id) values(2)") + }) + + // schema change should not affect because it is in txn + tk2.MustExec("alter table t2 add column(c1 int)") + + // test for select + doWithCheckPath(t, se, normalPathRecords, func() { + tk.MustQuery("select * from t1 where id=1").Check(testkit.Rows("1 10")) + }) + // test for select for update + doWithCheckPath(t, se, normalPathRecords, func() { + tk.MustQuery("select * from t1 where id=1 for update").Check(testkit.Rows("1 10")) + }) + tk.MustExec("rollback") +} + +func TestTxnContextInRC(t *testing.T) { + store, do, deferFunc := setupTxnContextTest(t) + defer deferFunc() + + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + se := tk.Session() + + tk2 := testkit.NewTestKit(t, store) + tk2.MustExec("use test") + + is1 := do.InfoSchema() + tk.MustExec("set tx_isolation = 'READ-COMMITTED'") + se.SetValue(sessiontxn.AssertTxnInfoSchemaKey, is1) + doWithCheckPath(t, se, normalPathRecords, func() { + tk.MustQuery("select * from t1 where id=1").Check(testkit.Rows("1 10")) + }) + + tk.MustExec("begin pessimistic") + + // schema change should not affect even in rc isolation + se.SetValue(sessiontxn.AssertTxnInfoSchemaKey, nil) + tk2.MustExec("alter table t2 add column(c1 int)") + + se.SetValue(sessiontxn.AssertTxnInfoSchemaKey, is1) + // test for write + doWithCheckPath(t, se, normalPathRecords, func() { + tk.MustExec("insert into t2 (id) values(2)") + }) + + // test for select + doWithCheckPath(t, se, normalPathRecords, func() { + tk.MustQuery("select * from t1 where id=1").Check(testkit.Rows("1 10")) + }) + + tk2.MustExec("update t1 set v=11 where id=1") + + // test for select + doWithCheckPath(t, se, normalPathRecords, func() { + tk.MustQuery("select * from t1 where id=1").Check(testkit.Rows("1 11")) + }) + + // test for select for update + doWithCheckPath(t, se, normalPathRecords, func() { + tk.MustQuery("select * from t1 where id=1 for update").Check(testkit.Rows("1 11")) + }) + + tk.MustExec("rollback") +} + +func TestTxnContextInPessimisticKeyConflict(t *testing.T) { + store, do, deferFunc := setupTxnContextTest(t) + defer deferFunc() + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + se := tk.Session() + is1 := do.InfoSchema() + + tk.MustExec("begin pessimistic") + + // trigger retry + tk2 := testkit.NewTestKit(t, store) + tk2.MustExec("use test") + tk2.MustExec("update t1 set v=11 where id=1") + tk2.MustExec("alter table t2 add column(c1 int)") + + se.SetValue(sessiontxn.AssertTxnInfoSchemaKey, is1) + path := append([]string{"assertTxnManagerAfterPessimisticLockErrorRetry"}, normalPathRecords...) + doWithCheckPath(t, se, path, func() { + tk.MustExec("update t1 set v=12 where id=1") + }) + + tk.MustExec("rollback") +} + +func TestTxnContextInOptimisticRetry(t *testing.T) { + store, do, deferFunc := setupTxnContextTest(t) + defer deferFunc() + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec("set @@tidb_disable_txn_auto_retry=0") + se := tk.Session() + is1 := do.InfoSchema() + + tk.MustExec("begin optimistic") + + // trigger retry + tk2 := testkit.NewTestKit(t, store) + tk2.MustExec("use test") + tk2.MustExec("update t1 set v=11 where id=1") + tk2.MustExec("alter table t2 add column(c1 int)") + + tk.MustExec("update t1 set v=12 where id=1") + + // check retry context + path := append([]string{"assertTxnManagerInRebuildPlan"}, normalPathRecords...) + se.SetValue(sessiontxn.AssertTxnInfoSchemaKey, is1) + se.SetValue(sessiontxn.AssertTxnInfoSchemaAfterRetryKey, do.InfoSchema()) + doWithCheckPath(t, se, path, func() { + tk.MustExec("commit") + }) + + tk.MustQuery("select * from t1 where id=1").Check(testkit.Rows("1 12")) +} + +func TestTxnContextForHistoricalRead(t *testing.T) { + store, do, deferFunc := setupTxnContextTest(t) + defer deferFunc() + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + se := tk.Session() + + safePoint := "20160102-15:04:05 -0700" + tk.MustExec(fmt.Sprintf(`INSERT INTO mysql.tidb VALUES ('tikv_gc_safe_point', '%s', '') ON DUPLICATE KEY UPDATE variable_value = '%s', comment=''`, safePoint, safePoint)) + + is1 := do.InfoSchema() + tk.MustExec("set @a=now(6)") + // change schema + tk.MustExec("alter table t2 add column(c1 int)") + tk.MustExec("update t1 set v=11 where id=1") + + tk.MustExec("set @@tidb_snapshot=@a") + se.SetValue(sessiontxn.AssertTxnInfoSchemaKey, is1) + doWithCheckPath(t, se, normalPathRecords, func() { + tk.MustQuery("select * from t1 where id=1").Check(testkit.Rows("1 10")) + }) + + doWithCheckPath(t, se, normalPathRecords, func() { + tk.MustQuery("select * from t1 where id=1 for update").Check(testkit.Rows("1 10")) + }) + + se.SetValue(sessiontxn.AssertTxnInfoSchemaKey, nil) + tk.MustExec("set @@tidb_snapshot=''") + tk.MustExec("begin") + + se.SetValue(sessiontxn.AssertTxnInfoSchemaKey, do.InfoSchema()) + doWithCheckPath(t, se, normalPathRecords, func() { + tk.MustQuery("select * from t1 where id=1").Check(testkit.Rows("1 11")) + }) + + doWithCheckPath(t, se, normalPathRecords, func() { + tk.MustQuery("select * from t1 where id=1 for update").Check(testkit.Rows("1 11")) + }) + + se.SetValue(sessiontxn.AssertTxnInfoSchemaKey, nil) + tk.MustExec("set @@tidb_snapshot=@a") + se.SetValue(sessiontxn.AssertTxnInfoSchemaKey, is1) + + doWithCheckPath(t, se, normalPathRecords, func() { + tk.MustQuery("select * from t1 where id=1").Check(testkit.Rows("1 10")) + }) + + doWithCheckPath(t, se, normalPathRecords, func() { + tk.MustQuery("select * from t1 where id=1 for update").Check(testkit.Rows("1 11")) + }) + + tk.MustExec("rollback") +} + +func TestTxnContextForStaleRead(t *testing.T) { + store, do, deferFunc := setupTxnContextTest(t) + defer deferFunc() + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + se := tk.Session() + + safePoint := "20160102-15:04:05 -0700" + tk.MustExec(fmt.Sprintf(`INSERT INTO mysql.tidb VALUES ('tikv_gc_safe_point', '%s', '') ON DUPLICATE KEY UPDATE variable_value = '%s', comment=''`, safePoint, safePoint)) + + is1 := do.InfoSchema() + tk.MustExec("set @a=now(6)") + time.Sleep(time.Millisecond * 1200) + + // change schema + tk.MustExec("alter table t2 add column(c1 int)") + tk.MustExec("update t1 set v=11 where id=1") + + // @@tidb_read_staleness + tk.MustExec("set @@tidb_read_staleness=-1") + se.SetValue(sessiontxn.AssertTxnInfoSchemaKey, is1) + doWithCheckPath(t, se, normalPathRecords, func() { + tk.MustQuery("select * from t1 as of timestamp @a").Check(testkit.Rows("1 10")) + }) + se.SetValue(sessiontxn.AssertTxnInfoSchemaKey, nil) + tk.MustExec("set @@tidb_read_staleness=''") + + // select ... as of ... + se.SetValue(sessiontxn.AssertTxnInfoSchemaKey, is1) + doWithCheckPath(t, se, normalPathRecords, func() { + tk.MustQuery("select * from t1 as of timestamp @a").Check(testkit.Rows("1 10")) + }) + + // @@tx_read_ts + se.SetValue(sessiontxn.AssertTxnInfoSchemaKey, nil) + tk.MustExec("set @@tx_read_ts=@a") + se.SetValue(sessiontxn.AssertTxnInfoSchemaKey, is1) + doWithCheckPath(t, se, normalPathRecords, func() { + tk.MustQuery("select * from t1 where id=1").Check(testkit.Rows("1 10")) + }) + se.SetValue(sessiontxn.AssertTxnInfoSchemaKey, do.InfoSchema()) + doWithCheckPath(t, se, normalPathRecords, func() { + tk.MustQuery("select * from t1 where id=1").Check(testkit.Rows("1 11")) + }) + + // txn begin with @tx_read_ts + se.SetValue(sessiontxn.AssertTxnInfoSchemaKey, nil) + tk.MustExec("set @@tx_read_ts=@a") + tk.MustExec("begin") + se.SetValue(sessiontxn.AssertTxnInfoSchemaKey, is1) + doWithCheckPath(t, se, normalPathRecords, func() { + tk.MustQuery("select * from t1 where id=1").Check(testkit.Rows("1 10")) + }) + tk.MustExec("rollback") + se.SetValue(sessiontxn.AssertTxnInfoSchemaKey, do.InfoSchema()) + doWithCheckPath(t, se, normalPathRecords, func() { + tk.MustQuery("select * from t1 where id=1").Check(testkit.Rows("1 11")) + }) + + // txn begin ... as of ... + se.SetValue(sessiontxn.AssertTxnInfoSchemaKey, nil) + tk.MustExec("start transaction read only as of timestamp @a") + se.SetValue(sessiontxn.AssertTxnInfoSchemaKey, is1) + doWithCheckPath(t, se, normalPathRecords, func() { + tk.MustQuery("select * from t1 where id=1").Check(testkit.Rows("1 10")) + }) + tk.MustExec("rollback") + se.SetValue(sessiontxn.AssertTxnInfoSchemaKey, do.InfoSchema()) + doWithCheckPath(t, se, normalPathRecords, func() { + tk.MustQuery("select * from t1 where id=1").Check(testkit.Rows("1 11")) + }) +} + +func TestTxnContextForPrepareExecute(t *testing.T) { + store, do, deferFunc := setupTxnContextTest(t) + defer deferFunc() + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + se := tk.Session() + + stmtID, _, _, err := se.PrepareStmt("select * from t1 where id=1") + require.NoError(t, err) + + is1 := do.InfoSchema() + se.SetValue(sessiontxn.AssertTxnInfoSchemaKey, is1) + + // Test prepare/execute in SQL + doWithCheckPath(t, se, normalPathRecords, func() { + tk.MustExec("prepare s from 'select * from t1 where id=1'") + }) + doWithCheckPath(t, se, normalPathRecords, func() { + tk.MustQuery("execute s").Check(testkit.Rows("1 10")) + }) + + // Test ExecutePreparedStmt + path := append([]string{"assertTxnManagerInPreparedStmtExec"}, normalPathRecords...) + doWithCheckPath(t, se, path, func() { + rs, err := se.ExecutePreparedStmt(context.TODO(), stmtID, nil) + require.NoError(t, err) + tk.ResultSetToResult(rs, fmt.Sprintf("%v", rs)).Check(testkit.Rows("1 10")) + }) + + // Test PlanCache + path = []string{"assertTxnManagerInCachedPlanExec", "assertTxnManagerInShortPointGetPlan"} + doWithCheckPath(t, se, path, func() { + rs, err := se.ExecutePreparedStmt(context.TODO(), stmtID, nil) + require.NoError(t, err) + tk.ResultSetToResult(rs, fmt.Sprintf("%v", rs)).Check(testkit.Rows("1 10")) + }) + + // In txn + se.SetValue(sessiontxn.AssertTxnInfoSchemaKey, nil) + tk.MustExec("begin") + + //change schema + tk2 := testkit.NewTestKit(t, store) + tk2.MustExec("use test") + tk2.MustExec("alter table t2 add column(c1 int)") + tk2.MustExec("update t1 set v=11 where id=1") + + se.SetValue(sessiontxn.AssertTxnInfoSchemaKey, is1) + doWithCheckPath(t, se, normalPathRecords, func() { + tk.MustExec("prepare s from 'select * from t1 where id=1'") + }) + doWithCheckPath(t, se, normalPathRecords, func() { + tk.MustQuery("execute s").Check(testkit.Rows("1 10")) + }) + path = append([]string{"assertTxnManagerInPreparedStmtExec"}, normalPathRecords...) + doWithCheckPath(t, se, path, func() { + rs, err := se.ExecutePreparedStmt(context.TODO(), stmtID, nil) + require.NoError(t, err) + tk.ResultSetToResult(rs, fmt.Sprintf("%v", rs)).Check(testkit.Rows("1 10")) + }) + + tk.MustExec("rollback") +} + +func TestTxnContextForStaleReadInPrepare(t *testing.T) { + store, do, deferFunc := setupTxnContextTest(t) + defer deferFunc() + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + se := tk.Session() + + is1 := do.InfoSchema() + tk.MustExec("set @a=now(6)") + tk.MustExec("prepare s1 from 'select * from t1 where id=1'") + tk.MustExec("prepare s2 from 'select * from t1 as of timestamp @a where id=1 '") + + stmtID1, _, _, err := se.PrepareStmt("select * from t1 where id=1") + require.NoError(t, err) + + stmtID2, _, _, err := se.PrepareStmt("select * from t1 as of timestamp @a where id=1 ") + require.NoError(t, err) + + //change schema + tk.MustExec("use test") + tk.MustExec("alter table t2 add column(c1 int)") + tk.MustExec("update t1 set v=11 where id=1") + + tk.MustExec("set @@tx_read_ts=@a") + stmtID3, _, _, err := se.PrepareStmt("select * from t1 where id=1 ") + require.NoError(t, err) + tk.MustExec("set @@tx_read_ts=''") + + tk.MustExec("set @@tx_read_ts=@a") + tk.MustExec("prepare s3 from 'select * from t1 where id=1 '") + tk.MustExec("set @@tx_read_ts=''") + + // tx_read_ts + tk.MustExec("set @@tx_read_ts=@a") + se.SetValue(sessiontxn.AssertTxnInfoSchemaKey, is1) + path := append([]string{"assertTxnManagerInPreparedStmtExec"}, normalPathRecords...) + doWithCheckPath(t, se, path, func() { + rs, err := se.ExecutePreparedStmt(context.TODO(), stmtID1, nil) + require.NoError(t, err) + tk.ResultSetToResult(rs, fmt.Sprintf("%v", rs)).Check(testkit.Rows("1 10")) + }) + se.SetValue(sessiontxn.AssertTxnInfoSchemaKey, nil) + tk.MustExec("set @@tx_read_ts=''") + + tk.MustExec("set @@tx_read_ts=@a") + se.SetValue(sessiontxn.AssertTxnInfoSchemaKey, is1) + doWithCheckPath(t, se, normalPathRecords, func() { + tk.MustExec("execute s1") + }) + se.SetValue(sessiontxn.AssertTxnInfoSchemaKey, nil) + tk.MustExec("set @@tx_read_ts=''") + + // select ... as of ... + se.SetValue(sessiontxn.AssertTxnInfoSchemaKey, is1) + doWithCheckPath(t, se, path, func() { + rs, err := se.ExecutePreparedStmt(context.TODO(), stmtID2, nil) + require.NoError(t, err) + tk.ResultSetToResult(rs, fmt.Sprintf("%v", rs)).Check(testkit.Rows("1 10")) + }) + doWithCheckPath(t, se, normalPathRecords, func() { + tk.MustExec("execute s2") + }) + + // plan cache for stmtID2 + doWithCheckPath(t, se, []string{"assertTxnManagerInCachedPlanExec", "assertTxnManagerInShortPointGetPlan"}, func() { + rs, err := se.ExecutePreparedStmt(context.TODO(), stmtID2, nil) + require.NoError(t, err) + tk.ResultSetToResult(rs, fmt.Sprintf("%v", rs)).Check(testkit.Rows("1 10")) + }) + + // tx_read_ts in prepare + se.SetValue(sessiontxn.AssertTxnInfoSchemaKey, is1) + doWithCheckPath(t, se, path, func() { + rs, err := se.ExecutePreparedStmt(context.TODO(), stmtID3, nil) + require.NoError(t, err) + tk.ResultSetToResult(rs, fmt.Sprintf("%v", rs)).Check(testkit.Rows("1 10")) + }) + doWithCheckPath(t, se, normalPathRecords, func() { + tk.MustExec("execute s3") + }) + + // plan cache for stmtID3 + doWithCheckPath(t, se, []string{"assertTxnManagerInCachedPlanExec", "assertTxnManagerInShortPointGetPlan"}, func() { + rs, err := se.ExecutePreparedStmt(context.TODO(), stmtID3, nil) + require.NoError(t, err) + tk.ResultSetToResult(rs, fmt.Sprintf("%v", rs)).Check(testkit.Rows("1 10")) + }) +} + +func TestTxnContextPreparedStmtWithForUpdate(t *testing.T) { + store, do, deferFunc := setupTxnContextTest(t) + defer deferFunc() + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + se := tk.Session() + + is1 := do.InfoSchema() + + stmtID1, _, _, err := se.PrepareStmt("select * from t1 where id=1 for update") + require.NoError(t, err) + tk.MustExec("prepare s from 'select * from t1 where id=1 for update'") + tk.MustExec("begin pessimistic") + + //change schema + tk2 := testkit.NewTestKit(t, store) + tk2.MustExec("use test") + tk2.MustExec("alter table t1 add column(c int default 100)") + tk2.MustExec("update t1 set v=11 where id=1") + + se.SetValue(sessiontxn.AssertTxnInfoSchemaKey, is1) + doWithCheckPath(t, se, normalPathRecords, func() { + tk.MustQuery("select * from t1 where id=1 for update").Check(testkit.Rows("1 11")) + }) + + se.SetValue(sessiontxn.AssertTxnInfoSchemaKey, do.InfoSchema()) + path := append([]string{"assertTxnManagerInPreparedStmtExec"}, normalPathRecords...) + doWithCheckPath(t, se, path, func() { + rs, err := se.ExecutePreparedStmt(context.TODO(), stmtID1, nil) + require.NoError(t, err) + tk.ResultSetToResult(rs, fmt.Sprintf("%v", rs)).Check(testkit.Rows("1 11 100")) + }) + + doWithCheckPath(t, se, normalPathRecords, func() { + tk.MustQuery("execute s").Check(testkit.Rows("1 11 100")) + }) + + se.SetValue(sessiontxn.AssertTxnInfoSchemaKey, nil) + tk.MustExec("rollback") +}