From 1648ea2228309b92da548941d8d4d021eb1fe13a Mon Sep 17 00:00:00 2001 From: tiancaiamao Date: Tue, 4 Dec 2018 19:38:42 +0800 Subject: [PATCH] *: Txn() function signature refactor and remove ActivePendingTxn() (#8327) --- ddl/column_change_test.go | 6 +++--- ddl/column_test.go | 12 +++++------ ddl/ddl_db_test.go | 16 +++++++------- ddl/ddl_test.go | 4 ++-- ddl/ddl_worker_test.go | 8 +++---- ddl/fail_test.go | 2 +- ddl/foreign_key_test.go | 6 +++--- ddl/index_change_test.go | 12 +++++------ ddl/reorg_test.go | 18 ++++++++-------- executor/adapter.go | 4 ++-- executor/admin.go | 2 +- executor/builder.go | 31 +++++++++++++++----------- executor/executor.go | 12 +++++------ executor/executor_test.go | 10 ++++----- executor/index_lookup_join.go | 7 ++++-- executor/prepared.go | 2 -- executor/simple.go | 10 ++++----- executor/write.go | 16 +++++++------- executor/write_test.go | 2 +- plan/physical_plan_test.go | 2 +- server/conn.go | 2 +- session/session.go | 20 +++-------------- session/session_test.go | 36 ++++++++++++++----------------- session/txn.go | 2 +- sessionctx/context.go | 10 ++++----- statistics/ddl.go | 12 +++++------ statistics/gc.go | 4 ++-- statistics/histogram.go | 2 +- statistics/update.go | 6 +++--- table/tables/tables.go | 26 +++++++++++----------- table/tables/tables_test.go | 8 +++---- util/admin/admin_test.go | 6 +++--- util/kvencoder/kv_encoder.go | 2 +- util/kvencoder/kv_encoder_test.go | 2 +- util/mock/context.go | 17 +-------------- 35 files changed, 155 insertions(+), 182 deletions(-) diff --git a/ddl/column_change_test.go b/ddl/column_change_test.go index 4f61022f780df..1b7d70f52ca7f 100644 --- a/ddl/column_change_test.go +++ b/ddl/column_change_test.go @@ -73,7 +73,7 @@ func (s *testColumnChangeSuite) TestColumnChange(c *C) { row := types.MakeDatums(1, 2) h, err := originTable.AddRecord(ctx, row, false) c.Assert(err, IsNil) - err = ctx.Txn().Commit(context.Background()) + err = ctx.Txn(true).Commit(context.Background()) c.Assert(err, IsNil) var mu sync.Mutex @@ -125,7 +125,7 @@ func (s *testColumnChangeSuite) TestColumnChange(c *C) { } mu.Unlock() } - err = hookCtx.Txn().Commit(context.Background()) + err = hookCtx.Txn(true).Commit(context.Background()) if err != nil { checkErr = errors.Trace(err) } @@ -177,7 +177,7 @@ func (s *testColumnChangeSuite) testAddColumnNoDefault(c *C, ctx sessionctx.Cont checkErr = errors.Trace(err) } } - err = hookCtx.Txn().Commit(context.TODO()) + err = hookCtx.Txn(true).Commit(context.TODO()) if err != nil { checkErr = errors.Trace(err) } diff --git a/ddl/column_test.go b/ddl/column_test.go index 7098d49beb5ca..f3fac4b463372 100644 --- a/ddl/column_test.go +++ b/ddl/column_test.go @@ -262,9 +262,9 @@ func (s *testColumnSuite) checkColumnKVExist(ctx sessionctx.Context, t table.Tab if err != nil { return errors.Trace(err) } - defer ctx.Txn().Commit(context.Background()) + defer ctx.Txn(true).Commit(context.Background()) key := t.RecordKey(handle) - data, err := ctx.Txn().Get(key) + data, err := ctx.Txn(true).Get(key) if !isExist { if terror.ErrorEqual(err, kv.ErrNotExist) { return nil @@ -751,7 +751,7 @@ func (s *testColumnSuite) TestAddColumn(c *C) { handle, err := t.AddRecord(ctx, oldRow, false) c.Assert(err, IsNil) - err = ctx.Txn().Commit(context.Background()) + err = ctx.Txn(true).Commit(context.Background()) c.Assert(err, IsNil) newColName := "c4" @@ -814,7 +814,7 @@ func (s *testColumnSuite) TestAddColumn(c *C) { job = testDropTable(c, ctx, d, s.dbInfo, tblInfo) testCheckJobDone(c, d, job, false) - err = ctx.Txn().Commit(context.Background()) + err = ctx.Txn(true).Commit(context.Background()) c.Assert(err, IsNil) d.Stop() @@ -838,7 +838,7 @@ func (s *testColumnSuite) TestDropColumn(c *C) { _, err = t.AddRecord(ctx, append(row, types.NewDatum(defaultColValue)), false) c.Assert(err, IsNil) - err = ctx.Txn().Commit(context.Background()) + err = ctx.Txn(true).Commit(context.Background()) c.Assert(err, IsNil) checkOK := false @@ -887,7 +887,7 @@ func (s *testColumnSuite) TestDropColumn(c *C) { job = testDropTable(c, ctx, d, s.dbInfo, tblInfo) testCheckJobDone(c, d, job, false) - err = ctx.Txn().Commit(context.Background()) + err = ctx.Txn(true).Commit(context.Background()) c.Assert(err, IsNil) d.Stop() diff --git a/ddl/ddl_db_test.go b/ddl/ddl_db_test.go index a7ce45e60fd7e..c62b59e8dd5cc 100644 --- a/ddl/ddl_db_test.go +++ b/ddl/ddl_db_test.go @@ -381,7 +381,7 @@ func (s *testDBSuite) TestCancelAddIndex(c *C) { return } jobIDs := []int64{job.ID} - errs, err := admin.CancelJobs(hookCtx.Txn(), jobIDs) + errs, err := admin.CancelJobs(hookCtx.Txn(true), jobIDs) if err != nil { checkErr = errors.Trace(err) return @@ -391,7 +391,7 @@ func (s *testDBSuite) TestCancelAddIndex(c *C) { checkErr = errors.Trace(errs[0]) return } - err = hookCtx.Txn().Commit(context.Background()) + err = hookCtx.Txn(true).Commit(context.Background()) if err != nil { checkErr = errors.Trace(err) } @@ -653,12 +653,12 @@ LOOP: // Make sure there is index with name c3_index. c.Assert(nidx, NotNil) c.Assert(nidx.Meta().ID, Greater, int64(0)) - ctx.Txn().Rollback() + ctx.Txn(true).Rollback() c.Assert(ctx.NewTxn(), IsNil) - defer ctx.Txn().Rollback() + defer ctx.Txn(true).Rollback() - it, err := nidx.SeekFirst(ctx.Txn()) + it, err := nidx.SeekFirst(ctx.Txn(true)) c.Assert(err, IsNil) defer it.Close() @@ -754,9 +754,9 @@ func checkDelRangeDone(c *C, ctx sessionctx.Context, idx table.Index) { handles := make(map[int64]struct{}) c.Assert(ctx.NewTxn(), IsNil) - defer ctx.Txn().Rollback() + defer ctx.Txn(true).Rollback() - it, err := idx.SeekFirst(ctx.Txn()) + it, err := idx.SeekFirst(ctx.Txn(true)) c.Assert(err, IsNil) defer it.Close() @@ -982,7 +982,7 @@ LOOP: i := 0 j := 0 ctx.NewTxn() - defer ctx.Txn().Rollback() + defer ctx.Txn(true).Rollback() err = t.IterRecords(ctx, t.FirstKey(), t.Cols(), func(h int64, data []types.Datum, cols []*table.Column) (bool, error) { i++ diff --git a/ddl/ddl_test.go b/ddl/ddl_test.go index b54e76a182dbc..fadc496e1c9bc 100644 --- a/ddl/ddl_test.go +++ b/ddl/ddl_test.go @@ -63,7 +63,7 @@ func testNewDDL(ctx context.Context, etcdCli *clientv3.Client, store kv.Storage, func getSchemaVer(c *C, ctx sessionctx.Context) int64 { err := ctx.NewTxn() c.Assert(err, IsNil) - m := meta.NewMeta(ctx.Txn()) + m := meta.NewMeta(ctx.Txn(true)) ver, err := m.GetSchemaVersion() c.Assert(err, IsNil) return ver @@ -91,7 +91,7 @@ func checkHistoryJob(c *C, job *model.Job) { } func checkHistoryJobArgs(c *C, ctx sessionctx.Context, id int64, args *historyJobArgs) { - t := meta.NewMeta(ctx.Txn()) + t := meta.NewMeta(ctx.Txn(true)) historyJob, err := t.GetHistoryDDLJob(id) c.Assert(err, IsNil) c.Assert(historyJob.BinlogInfo.FinishedTS, Greater, uint64(0)) diff --git a/ddl/ddl_worker_test.go b/ddl/ddl_worker_test.go index d750eed66197b..946702f1509d0 100644 --- a/ddl/ddl_worker_test.go +++ b/ddl/ddl_worker_test.go @@ -509,7 +509,7 @@ func (s *testDDLSuite) TestCancelJob(c *C) { row := types.MakeDatums(1, 2) _, err = originTable.AddRecord(ctx, row, false) c.Assert(err, IsNil) - err = ctx.Txn().Commit(context.Background()) + err = ctx.Txn(true).Commit(context.Background()) c.Assert(err, IsNil) tc := &TestDDLCallback{} @@ -530,8 +530,8 @@ func (s *testDDLSuite) TestCancelJob(c *C) { checkErr = errors.Trace(err) return } - checkCancelState(hookCtx.Txn(), job, test) - err = hookCtx.Txn().Commit(context.Background()) + checkCancelState(hookCtx.Txn(true), job, test) + err = hookCtx.Txn(true).Commit(context.Background()) if err != nil { checkErr = errors.Trace(err) return @@ -559,7 +559,7 @@ func (s *testDDLSuite) TestCancelJob(c *C) { test = &tests[3] testCreateIndex(c, ctx, d, dbInfo, tblInfo, false, "idx", "c2") c.Check(errors.ErrorStack(checkErr), Equals, "") - c.Assert(ctx.Txn().Commit(context.Background()), IsNil) + c.Assert(ctx.Txn(true).Commit(context.Background()), IsNil) // for dropping index idxName := []interface{}{model.NewCIStr("idx")} diff --git a/ddl/fail_test.go b/ddl/fail_test.go index f89c4a9aae2ff..24e8da3579cd8 100644 --- a/ddl/fail_test.go +++ b/ddl/fail_test.go @@ -36,7 +36,7 @@ func (s *testColumnChangeSuite) TestFailBeforeDecodeArgs(c *C) { row := types.MakeDatums(1, 2) _, err = originTable.AddRecord(ctx, row, false) c.Assert(err, IsNil) - err = ctx.Txn().Commit(context.Background()) + err = ctx.Txn(true).Commit(context.Background()) c.Assert(err, IsNil) tc := &TestDDLCallback{} diff --git a/ddl/foreign_key_test.go b/ddl/foreign_key_test.go index 984d99696a1ec..8f9803ee7f90f 100644 --- a/ddl/foreign_key_test.go +++ b/ddl/foreign_key_test.go @@ -126,7 +126,7 @@ func (s *testForeighKeySuite) TestForeignKey(c *C) { testCreateTable(c, ctx, d, s.dbInfo, tblInfo) - err = ctx.Txn().Commit(context.Background()) + err = ctx.Txn(true).Commit(context.Background()) c.Assert(err, IsNil) // fix data race @@ -160,7 +160,7 @@ func (s *testForeighKeySuite) TestForeignKey(c *C) { job := s.testCreateForeignKey(c, tblInfo, "c1_fk", []string{"c1"}, "t2", []string{"c1"}, ast.ReferOptionCascade, ast.ReferOptionSetNull) testCheckJobDone(c, d, job, true) - err = ctx.Txn().Commit(context.Background()) + err = ctx.Txn(true).Commit(context.Background()) c.Assert(err, IsNil) mu.Lock() hErr := hookErr @@ -218,6 +218,6 @@ func (s *testForeighKeySuite) TestForeignKey(c *C) { job = testDropTable(c, ctx, d, s.dbInfo, tblInfo) testCheckJobDone(c, d, job, false) - err = ctx.Txn().Commit(context.Background()) + err = ctx.Txn(true).Commit(context.Background()) c.Assert(err, IsNil) } diff --git a/ddl/index_change_test.go b/ddl/index_change_test.go index 96caee931ac6a..72a3138ca5fa0 100644 --- a/ddl/index_change_test.go +++ b/ddl/index_change_test.go @@ -74,7 +74,7 @@ func (s *testIndexChangeSuite) TestIndexChange(c *C) { _, err = originTable.AddRecord(ctx, types.MakeDatums(3, 3), false) c.Assert(err, IsNil) - err = ctx.Txn().Commit(context.Background()) + err = ctx.Txn(true).Commit(context.Background()) c.Assert(err, IsNil) tc := &TestDDLCallback{} @@ -125,7 +125,7 @@ func (s *testIndexChangeSuite) TestIndexChange(c *C) { d.SetHook(tc) testCreateIndex(c, ctx, d, s.dbInfo, originTable.Meta(), false, "c2", "c2") c.Check(errors.ErrorStack(checkErr), Equals, "") - c.Assert(ctx.Txn().Commit(context.Background()), IsNil) + c.Assert(ctx.Txn(true).Commit(context.Background()), IsNil) d.Stop() prevState = model.StateNone var noneTable table.Table @@ -172,7 +172,7 @@ func (s *testIndexChangeSuite) TestIndexChange(c *C) { func checkIndexExists(ctx sessionctx.Context, tbl table.Table, indexValue interface{}, handle int64, exists bool) error { idx := tbl.Indices()[0] - doesExist, _, err := idx.Exist(ctx.GetSessionVars().StmtCtx, ctx.Txn(), types.MakeDatums(indexValue), handle) + doesExist, _, err := idx.Exist(ctx.GetSessionVars().StmtCtx, ctx.Txn(true), types.MakeDatums(indexValue), handle) if err != nil { return errors.Trace(err) } @@ -322,7 +322,7 @@ func (s *testIndexChangeSuite) checkAddPublic(d *ddl, ctx sessionctx.Context, wr return errors.Trace(err) } } - return ctx.Txn().Commit(context.Background()) + return ctx.Txn(true).Commit(context.Background()) } func (s *testIndexChangeSuite) checkDropWriteOnly(d *ddl, ctx sessionctx.Context, publicTbl, writeTbl table.Table) error { @@ -362,7 +362,7 @@ func (s *testIndexChangeSuite) checkDropWriteOnly(d *ddl, ctx sessionctx.Context if err != nil { return errors.Trace(err) } - return ctx.Txn().Commit(context.Background()) + return ctx.Txn(true).Commit(context.Background()) } func (s *testIndexChangeSuite) checkDropDeleteOnly(d *ddl, ctx sessionctx.Context, writeTbl, delTbl table.Table) error { @@ -407,5 +407,5 @@ func (s *testIndexChangeSuite) checkDropDeleteOnly(d *ddl, ctx sessionctx.Contex if err != nil { return errors.Trace(err) } - return ctx.Txn().Commit(context.Background()) + return ctx.Txn(true).Commit(context.Background()) } diff --git a/ddl/reorg_test.go b/ddl/reorg_test.go index 42324a6cd40eb..adc9a51d3b808 100644 --- a/ddl/reorg_test.go +++ b/ddl/reorg_test.go @@ -49,14 +49,14 @@ func (s *testDDLSuite) TestReorg(c *C) { err := ctx.NewTxn() c.Assert(err, IsNil) - ctx.Txn().Set([]byte("a"), []byte("b")) - err = ctx.Txn().Rollback() + ctx.Txn(true).Set([]byte("a"), []byte("b")) + err = ctx.Txn(true).Rollback() c.Assert(err, IsNil) err = ctx.NewTxn() c.Assert(err, IsNil) - ctx.Txn().Set([]byte("a"), []byte("b")) - err = ctx.Txn().Commit(context.Background()) + ctx.Txn(true).Set([]byte("a"), []byte("b")) + err = ctx.Txn(true).Commit(context.Background()) c.Assert(err, IsNil) rowCount := int64(10) @@ -73,7 +73,7 @@ func (s *testDDLSuite) TestReorg(c *C) { } err = ctx.NewTxn() c.Assert(err, IsNil) - m := meta.NewMeta(ctx.Txn()) + m := meta.NewMeta(ctx.Txn(true)) rInfo := &reorgInfo{ Job: job, } @@ -89,12 +89,12 @@ func (s *testDDLSuite) TestReorg(c *C) { c.Assert(d.reorgCtx.rowCount, Equals, int64(0)) // Test whether reorgInfo's Handle is update. - err = ctx.Txn().Commit(context.Background()) + err = ctx.Txn(true).Commit(context.Background()) c.Assert(err, IsNil) err = ctx.NewTxn() c.Assert(err, IsNil) - m = meta.NewMeta(ctx.Txn()) + m = meta.NewMeta(ctx.Txn(true)) info, err1 := d.getReorgInfo(m, job, nil) c.Assert(err1, IsNil) c.Assert(info.Handle, Equals, handle) @@ -110,7 +110,7 @@ func (s *testDDLSuite) TestReorg(c *C) { return nil }) c.Assert(err, NotNil) - err = ctx.Txn().Commit(context.Background()) + err = ctx.Txn(true).Commit(context.Background()) c.Assert(err, IsNil) d.start(context.Background()) @@ -172,7 +172,7 @@ func (s *testDDLSuite) TestReorgOwner(c *C) { c.Assert(err, IsNil) } - err := ctx.Txn().Commit(context.Background()) + err := ctx.Txn(true).Commit(context.Background()) c.Assert(err, IsNil) tc := &TestDDLCallback{} diff --git a/executor/adapter.go b/executor/adapter.go index 003a502a2dd3a..77720bfd23e2d 100644 --- a/executor/adapter.go +++ b/executor/adapter.go @@ -235,7 +235,7 @@ func (a *ExecStmt) Exec(ctx context.Context) (ast.RecordSet, error) { var txnStartTS uint64 if sctx.Txn(false).Valid() { - txnStartTS = sctx.Txn().StartTS() + txnStartTS = sctx.Txn(true).StartTS() } return &recordSet{ executor: e, @@ -265,7 +265,7 @@ func (a *ExecStmt) handleNoDelayExecutor(ctx context.Context, sctx sessionctx.Co txnTS := uint64(0) // Don't active pending txn here. if sctx.Txn(false).Valid() { - txnTS = sctx.Txn().StartTS() + txnTS = sctx.Txn(true).StartTS() } a.LogSlowQuery(txnTS, err == nil) }() diff --git a/executor/admin.go b/executor/admin.go index 63e43a8cd8ec2..efc0646a27976 100644 --- a/executor/admin.go +++ b/executor/admin.go @@ -120,7 +120,7 @@ func (e *CheckIndexRangeExec) Open(ctx context.Context) error { func (e *CheckIndexRangeExec) buildDAGPB() (*tipb.DAGRequest, error) { dagReq := &tipb.DAGRequest{} - dagReq.StartTs = e.ctx.Txn().StartTS() + dagReq.StartTs = e.ctx.Txn(true).StartTS() dagReq.TimeZoneOffset = timeZoneOffset(e.ctx) sc := e.ctx.GetSessionVars().StmtCtx dagReq.Flags = statementContextToFlags(sc) diff --git a/executor/builder.go b/executor/builder.go index e590c0f8de0f8..c94782e6a41ea 100644 --- a/executor/builder.go +++ b/executor/builder.go @@ -42,7 +42,6 @@ import ( "github.com/pingcap/tidb/util/admin" "github.com/pingcap/tidb/util/ranger" tipb "github.com/pingcap/tipb/go-tipb" - log "github.com/sirupsen/logrus" "golang.org/x/net/context" ) @@ -172,7 +171,7 @@ func (b *executorBuilder) buildCancelDDLJobs(v *plan.CancelDDLJobs) Executor { baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ExplainID()), jobIDs: v.JobIDs, } - e.errs, b.err = admin.CancelJobs(e.ctx.Txn(), e.jobIDs) + e.errs, b.err = admin.CancelJobs(e.ctx.Txn(true), e.jobIDs) if b.err != nil { b.err = errors.Trace(b.err) return nil @@ -206,7 +205,7 @@ func (b *executorBuilder) buildShowDDL(v *plan.ShowDDL) Executor { return nil } - ddlInfo, err := admin.GetDDLInfo(e.ctx.Txn()) + ddlInfo, err := admin.GetDDLInfo(e.ctx.Txn(true)) if err != nil { b.err = errors.Trace(err) return nil @@ -384,7 +383,11 @@ func (b *executorBuilder) buildChecksumTable(v *plan.ChecksumTable) Executor { tables: make(map[int64]*checksumContext), done: false, } - startTs := b.getStartTS() + startTs, err := b.getStartTS() + if err != nil { + b.err = err + return nil + } for _, t := range v.Tables { e.tables[t.TableInfo.ID] = newChecksumContext(t.DBInfo, t.TableInfo, startTs) } @@ -911,22 +914,22 @@ func (b *executorBuilder) buildTableDual(v *plan.PhysicalTableDual) Executor { return e } -func (b *executorBuilder) getStartTS() uint64 { +func (b *executorBuilder) getStartTS() (uint64, error) { if b.startTS != 0 { // Return the cached value. - return b.startTS + return b.startTS, nil } startTS := b.ctx.GetSessionVars().SnapshotTS - if startTS == 0 && b.ctx.Txn().Valid() { - startTS = b.ctx.Txn().StartTS() + if startTS == 0 && b.ctx.Txn(true).Valid() { + startTS = b.ctx.Txn(true).StartTS() } b.startTS = startTS if b.startTS == 0 { - // The the code should never run here if there is no bug. - log.Error(errors.ErrorStack(errors.Trace(ErrGetStartTS))) + // It may happen when getting start ts from PD fail, and Txn() is not valid. + return 0, errors.Trace(ErrGetStartTS) } - return startTS + return startTS, nil } func (b *executorBuilder) buildMemTable(v *plan.PhysicalMemTable) Executor { @@ -1195,8 +1198,12 @@ func (b *executorBuilder) buildAnalyze(v *plan.Analyze) Executor { } func (b *executorBuilder) constructDAGReq(plans []plan.PhysicalPlan) (*tipb.DAGRequest, bool, error) { + var err error dagReq := &tipb.DAGRequest{} - dagReq.StartTs = b.getStartTS() + dagReq.StartTs, err = b.getStartTS() + if err != nil { + return nil, false, errors.Trace(err) + } dagReq.TimeZoneOffset = timeZoneOffset(b.ctx) sc := b.ctx.GetSessionVars().StmtCtx dagReq.Flags = statementContextToFlags(sc) diff --git a/executor/executor.go b/executor/executor.go index 548dc6f02fb9e..2eeadb72a186b 100644 --- a/executor/executor.go +++ b/executor/executor.go @@ -315,12 +315,12 @@ func (e *ShowDDLJobQueriesExec) Open(ctx context.Context) error { if err := e.baseExecutor.Open(ctx); err != nil { return errors.Trace(err) } - jobs, err := admin.GetDDLJobs(e.ctx.Txn()) + jobs, err := admin.GetDDLJobs(e.ctx.Txn(true)) if err != nil { return errors.Trace(err) } // TODO: need to return the job that the user needs. - historyJobs, err := admin.GetHistoryDDLJobs(e.ctx.Txn()) + historyJobs, err := admin.GetHistoryDDLJobs(e.ctx.Txn(true)) if err != nil { return errors.Trace(err) } @@ -357,12 +357,12 @@ func (e *ShowDDLJobsExec) Open(ctx context.Context) error { if err := e.baseExecutor.Open(ctx); err != nil { return errors.Trace(err) } - jobs, err := admin.GetDDLJobs(e.ctx.Txn()) + jobs, err := admin.GetDDLJobs(e.ctx.Txn(true)) if err != nil { return errors.Trace(err) } - historyJobs, err := admin.GetHistoryDDLJobs(e.ctx.Txn()) + historyJobs, err := admin.GetHistoryDDLJobs(e.ctx.Txn(true)) if err != nil { return errors.Trace(err) } @@ -420,7 +420,7 @@ func (e *CheckTableExec) Next(ctx context.Context, chk *chunk.Chunk) error { return errors.Trace(err) } for _, idx := range tb.Indices() { - txn := e.ctx.Txn() + txn := e.ctx.Txn(true) err = admin.CompareIndexData(e.ctx, txn, tb, idx) if err != nil { return errors.Errorf("%v err:%v", t.Name, err) @@ -523,7 +523,7 @@ func (e *SelectLockExec) Next(ctx context.Context, chk *chunk.Chunk) error { if len(e.Schema().TblID2Handle) == 0 || e.Lock != ast.SelectLockForUpdate { return nil } - txn := e.ctx.Txn() + txn := e.ctx.Txn(true) keys := make([]kv.Key, 0, chk.NumRows()) iter := chunk.NewIterator4Chunk(chk) for id, cols := range e.Schema().TblID2Handle { diff --git a/executor/executor_test.go b/executor/executor_test.go index 2877b992b102f..4dd4d394083de 100644 --- a/executor/executor_test.go +++ b/executor/executor_test.go @@ -315,7 +315,7 @@ func checkCases(tests []testCase, ld *executor.LoadDataInfo, Commentf("data1:%v, data2:%v, data:%v", string(tt.data1), string(tt.data2), string(data))) } ctx.StmtCommit() - err1 = ctx.Txn().Commit(context.Background()) + err1 = ctx.Txn(true).Commit(context.Background()) c.Assert(err1, IsNil) r := tk.MustQuery(selectSQL) r.Check(testutil.RowsWithSep("|", tt.expected...)) @@ -2068,11 +2068,11 @@ func (s *testSuite) TestTiDBCurrentTS(c *C) { tk.MustExec("begin") rows := tk.MustQuery("select @@tidb_current_ts").Rows() tsStr := rows[0][0].(string) - c.Assert(tsStr, Equals, fmt.Sprintf("%d", tk.Se.Txn().StartTS())) + c.Assert(tsStr, Equals, fmt.Sprintf("%d", tk.Se.Txn(true).StartTS())) tk.MustExec("begin") rows = tk.MustQuery("select @@tidb_current_ts").Rows() newTsStr := rows[0][0].(string) - c.Assert(newTsStr, Equals, fmt.Sprintf("%d", tk.Se.Txn().StartTS())) + c.Assert(newTsStr, Equals, fmt.Sprintf("%d", tk.Se.Txn(true).StartTS())) c.Assert(newTsStr, Not(Equals), tsStr) tk.MustExec("commit") tk.MustQuery("select @@tidb_current_ts").Check(testkit.Rows("0")) @@ -2091,7 +2091,7 @@ func (s *testSuite) TestSelectForUpdate(c *C) { tk.MustExec("drop table if exists t, t1") - c.Assert(tk.Se.Txn().Valid(), IsFalse) + c.Assert(tk.Se.Txn(true).Valid(), IsFalse) tk.MustExec("create table t (c1 int, c2 int, c3 int)") tk.MustExec("insert t values (11, 2, 3)") tk.MustExec("insert t values (12, 2, 3)") @@ -2714,7 +2714,7 @@ func (s *testSuite) TestCheckIndex(c *C) { c.Assert(err, IsNil) _, err = tb.AddRecord(s.ctx, recordVal2, false) c.Assert(err, IsNil) - c.Assert(s.ctx.Txn().Commit(context.Background()), IsNil) + c.Assert(s.ctx.Txn(true).Commit(context.Background()), IsNil) mockCtx := mock.NewContext() idx := tb.Indices()[0] diff --git a/executor/index_lookup_join.go b/executor/index_lookup_join.go index 5c7bc303f84ac..796f18e1751d9 100644 --- a/executor/index_lookup_join.go +++ b/executor/index_lookup_join.go @@ -145,9 +145,12 @@ func (e *IndexLookUpJoin) Open(ctx context.Context) error { // The trick here is `getStartTS` will cache start ts in the dataReaderBuilder, // so even txn is destroyed later, the dataReaderBuilder could still use the // cached start ts to construct DAG. - e.innerCtx.readerBuilder.getStartTS() + _, err := e.innerCtx.readerBuilder.getStartTS() + if err != nil { + return errors.Trace(err) + } - err := e.children[0].Open(ctx) + err = e.children[0].Open(ctx) if err != nil { return errors.Trace(err) } diff --git a/executor/prepared.go b/executor/prepared.go index ae92d12a25c57..c44bc6e804813 100644 --- a/executor/prepared.go +++ b/executor/prepared.go @@ -202,8 +202,6 @@ func (e *ExecuteExec) Build() error { var err error if IsPointGetWithPKOrUniqueKeyByAutoCommit(e.ctx, e.plan) { err = e.ctx.InitTxnWithStartTS(math.MaxUint64) - } else { - err = e.ctx.ActivePendingTxn() } if err != nil { return errors.Trace(err) diff --git a/executor/simple.go b/executor/simple.go index 1dba6350e3f08..a14034dc33ddf 100644 --- a/executor/simple.go +++ b/executor/simple.go @@ -113,8 +113,8 @@ func (e *SimpleExec) executeBegin(s *ast.BeginStmt) error { // the transaction with COMMIT or ROLLBACK. The autocommit mode then // reverts to its previous state. e.ctx.GetSessionVars().SetStatusFlag(mysql.ServerStatusInTrans, true) - // Call ctx.Txn() to active pending txn. - e.ctx.Txn() + // Call ctx.Txn(true) to active pending txn. + e.ctx.Txn(true) return nil } @@ -126,9 +126,9 @@ func (e *SimpleExec) executeRollback(s *ast.RollbackStmt) error { sessVars := e.ctx.GetSessionVars() log.Debugf("con:%d execute rollback statement", sessVars.ConnectionID) sessVars.SetStatusFlag(mysql.ServerStatusInTrans, false) - if e.ctx.Txn().Valid() { + if e.ctx.Txn(true).Valid() { e.ctx.GetSessionVars().TxnCtx.ClearDelta() - return e.ctx.Txn().Rollback() + return e.ctx.Txn(true).Rollback() } return nil } @@ -208,7 +208,7 @@ func (e *SimpleExec) executeAlterUser(s *ast.AlterUserStmt) error { } if len(failedUsers) > 0 { // Commit the transaction even if we returns error - err := e.ctx.Txn().Commit(sessionctx.SetConnID2Ctx(context.Background(), e.ctx)) + err := e.ctx.Txn(true).Commit(sessionctx.SetConnID2Ctx(context.Background(), e.ctx)) if err != nil { return errors.Trace(err) } diff --git a/executor/write.go b/executor/write.go index 83df7c570efb6..33e1296a02ac2 100644 --- a/executor/write.go +++ b/executor/write.go @@ -839,9 +839,9 @@ func (e *InsertExec) insertOneRow(row []types.Datum) (int64, error) { if err := e.checkBatchLimit(); err != nil { return 0, errors.Trace(err) } - e.ctx.Txn().SetOption(kv.PresumeKeyNotExists, nil) + e.ctx.Txn(true).SetOption(kv.PresumeKeyNotExists, nil) h, err := e.Table.AddRecord(e.ctx, row, false) - e.ctx.Txn().DelOption(kv.PresumeKeyNotExists) + e.ctx.Txn(true).DelOption(kv.PresumeKeyNotExists) if err != nil { return 0, errors.Trace(err) } @@ -859,7 +859,7 @@ func (e *InsertExec) exec(ctx context.Context, rows [][]types.Datum) (types.Datu e.rowCount = 0 if !sessVars.ImportingData { - sessVars.GetWriteStmtBufs().BufStore = kv.NewBufferStore(e.ctx.Txn(), kv.TempTxnMemBufCap) + sessVars.GetWriteStmtBufs().BufStore = kv.NewBufferStore(e.ctx.Txn(true), kv.TempTxnMemBufCap) } // If `ON DUPLICATE KEY UPDATE` is specified, and no `IGNORE` keyword, @@ -892,10 +892,10 @@ func (e *InsertExec) exec(ctx context.Context, rows [][]types.Datum) (types.Datu return nil, errors.Trace(err) } if len(e.OnDuplicate) == 0 && !e.IgnoreErr { - e.ctx.Txn().SetOption(kv.PresumeKeyNotExists, nil) + e.ctx.Txn(true).SetOption(kv.PresumeKeyNotExists, nil) } h, err := e.Table.AddRecord(e.ctx, row, false) - e.ctx.Txn().DelOption(kv.PresumeKeyNotExists) + e.ctx.Txn(true).DelOption(kv.PresumeKeyNotExists) if err == nil { if !sessVars.ImportingData { e.ctx.StmtAddDirtyTableOP(DirtyTableAddRow, e.Table.Meta().ID, h, row) @@ -946,7 +946,7 @@ func batchGetOldValues(ctx sessionctx.Context, t table.Table, handles []int64) ( for _, handle := range handles { batchKeys = append(batchKeys, t.RecordKey(handle)) } - values, err := kv.BatchGetValues(ctx.Txn(), batchKeys) + values, err := kv.BatchGetValues(ctx.Txn(true), batchKeys) if err != nil { return nil, errors.Trace(err) } @@ -1066,7 +1066,7 @@ func batchGetInsertKeys(ctx sessionctx.Context, t table.Table, newRows [][]types batchKeys = append(batchKeys, k.key) } } - values, err := kv.BatchGetValues(ctx.Txn(), batchKeys) + values, err := kv.BatchGetValues(ctx.Txn(true), batchKeys) if err != nil { return nil, nil, errors.Trace(err) } @@ -1086,7 +1086,7 @@ func (e *InsertExec) checkBatchLimit() error { } e.rowCount = 0 if !sessVars.ImportingData { - sessVars.GetWriteStmtBufs().BufStore = kv.NewBufferStore(e.ctx.Txn(), kv.TempTxnMemBufCap) + sessVars.GetWriteStmtBufs().BufStore = kv.NewBufferStore(e.ctx.Txn(true), kv.TempTxnMemBufCap) } } return nil diff --git a/executor/write_test.go b/executor/write_test.go index 7ba3a71579cc9..072ad19941228 100644 --- a/executor/write_test.go +++ b/executor/write_test.go @@ -1666,7 +1666,7 @@ func (s *testSuite) TestRebaseIfNeeded(c *C) { // which could simulate another TiDB adds a large auto ID. _, err = tbl.AddRecord(s.ctx, types.MakeDatums(30001, 2), false) c.Assert(err, IsNil) - c.Assert(s.ctx.Txn().Commit(context.Background()), IsNil) + c.Assert(s.ctx.Txn(true).Commit(context.Background()), IsNil) tk.MustExec(`update t set b = 3 where a = 30001;`) tk.MustExec(`insert into t (b) values (4);`) diff --git a/plan/physical_plan_test.go b/plan/physical_plan_test.go index 583a23b53c1ca..f5145f4d3cd75 100644 --- a/plan/physical_plan_test.go +++ b/plan/physical_plan_test.go @@ -739,7 +739,7 @@ func (s *testPlanSuite) TestDAGPlanBuilderUnionScan(c *C) { err = se.NewTxn() c.Assert(err, IsNil) // Make txn not read only. - se.Txn().Set(kv.Key("AAA"), []byte("BBB")) + se.Txn(true).Set(kv.Key("AAA"), []byte("BBB")) se.StmtCommit() p, err := plan.Optimize(se, stmt, s.is) c.Assert(err, IsNil) diff --git a/server/conn.go b/server/conn.go index 56f9a68d45e88..f0871ffcb47c8 100644 --- a/server/conn.go +++ b/server/conn.go @@ -804,7 +804,7 @@ func (cc *clientConn) handleLoadData(ctx context.Context, loadDataInfo *executor } } - txn := loadDataInfo.Ctx.Txn() + txn := loadDataInfo.Ctx.Txn(true) loadDataInfo.Ctx.StmtCommit() if err != nil { if txn != nil && txn.Valid() { diff --git a/session/session.go b/session/session.go index d3dfe67b4d527..b37d81825d6c9 100644 --- a/session/session.go +++ b/session/session.go @@ -795,7 +795,7 @@ func (s *session) Execute(ctx context.Context, sql string) (recordSets []ast.Rec if planCacheEnabled { schemaVersion := domain.GetDomain(s).InfoSchema().SchemaMetaVersion() - readOnly := s.Txn() == nil || s.Txn().IsReadOnly() + readOnly := s.Txn(true) == nil || s.Txn(true).IsReadOnly() cacheKey = plan.NewSQLCacheKey(s.sessionVars, sql, schemaVersion, readOnly) cacheValue, hitCache = plan.GlobalPlanCache.Get(cacheKey) @@ -984,8 +984,8 @@ func (s *session) DropPreparedStmt(stmtID uint32) error { return nil } -func (s *session) Txn(opt ...bool) kv.Transaction { - if s.txn.pending() && len(opt) == 0 { +func (s *session) Txn(active bool) kv.Transaction { + if s.txn.pending() && active { // Transaction is lazy intialized. // PrepareTxnCtx is called to get a tso future, makes s.txn a pending txn, // If Txn() is called later, wait for the future to get a valid txn. @@ -1376,20 +1376,6 @@ func (s *session) RefreshTxnCtx(ctx context.Context) error { return errors.Trace(s.NewTxn()) } -// ActivePendingTxn implements Context.ActivePendingTxn interface. -func (s *session) ActivePendingTxn() error { - if s.txn.Valid() { - return nil - } - txnCap := s.getMembufCap() - // The transaction status should be pending. - if err := s.txn.changePendingToValid(txnCap); err != nil { - return errors.Trace(err) - } - s.sessionVars.TxnCtx.StartTS = s.txn.StartTS() - return nil -} - // InitTxnWithStartTS create a transaction with startTS. func (s *session) InitTxnWithStartTS(startTS uint64) error { if s.txn.Valid() { diff --git a/session/session_test.go b/session/session_test.go index f169de1a31366..bdc886dda54dc 100644 --- a/session/session_test.go +++ b/session/session_test.go @@ -144,10 +144,6 @@ func (s *testSessionSuite) TestForCoverage(c *C) { tk.MustExec("show processlist") _, err := tk.Se.FieldList("t") c.Check(err, IsNil) - - // Cover the error branch, althrough this never happen. - err = tk.Se.ActivePendingTxn() - c.Assert(err, NotNil) } func (s *testSessionSuite) TestErrorRollback(c *C) { @@ -253,7 +249,7 @@ func (s *testSessionSuite) TestRowLock(c *C) { tk2 := testkit.NewTestKitWithInit(c, s.store) tk.MustExec("drop table if exists t") - c.Assert(tk.Se.Txn().Valid(), IsFalse) + c.Assert(tk.Se.Txn(true).Valid(), IsFalse) tk.MustExec("create table t (c1 int, c2 int, c3 int)") tk.MustExec("insert t values (11, 2, 3)") tk.MustExec("insert t values (12, 2, 3)") @@ -466,7 +462,7 @@ func (s *testSessionSuite) TestRetryCleanTxn(c *C) { history.Add(0, stmt, tk.Se.GetSessionVars().StmtCtx) _, err = tk.Exec("commit") c.Assert(err, NotNil) - c.Assert(tk.Se.Txn().Valid(), IsFalse) + c.Assert(tk.Se.Txn(true).Valid(), IsFalse) c.Assert(tk.Se.GetSessionVars().InTxn(), IsFalse) } @@ -541,39 +537,39 @@ func (s *testSessionSuite) TestInTrans(c *C) { tk.MustExec("create table t (id BIGINT PRIMARY KEY AUTO_INCREMENT NOT NULL)") tk.MustExec("insert t values ()") tk.MustExec("begin") - c.Assert(tk.Se.Txn().Valid(), IsTrue) + c.Assert(tk.Se.Txn(true).Valid(), IsTrue) tk.MustExec("insert t values ()") - c.Assert(tk.Se.Txn().Valid(), IsTrue) + c.Assert(tk.Se.Txn(true).Valid(), IsTrue) tk.MustExec("drop table if exists t;") - c.Assert(tk.Se.Txn().Valid(), IsFalse) + c.Assert(tk.Se.Txn(true).Valid(), IsFalse) tk.MustExec("create table t (id BIGINT PRIMARY KEY AUTO_INCREMENT NOT NULL)") - c.Assert(tk.Se.Txn().Valid(), IsFalse) + c.Assert(tk.Se.Txn(true).Valid(), IsFalse) tk.MustExec("insert t values ()") - c.Assert(tk.Se.Txn().Valid(), IsFalse) + c.Assert(tk.Se.Txn(true).Valid(), IsFalse) tk.MustExec("commit") tk.MustExec("insert t values ()") tk.MustExec("set autocommit=0") tk.MustExec("begin") - c.Assert(tk.Se.Txn().Valid(), IsTrue) + c.Assert(tk.Se.Txn(true).Valid(), IsTrue) tk.MustExec("insert t values ()") - c.Assert(tk.Se.Txn().Valid(), IsTrue) + c.Assert(tk.Se.Txn(true).Valid(), IsTrue) tk.MustExec("commit") - c.Assert(tk.Se.Txn().Valid(), IsFalse) + c.Assert(tk.Se.Txn(true).Valid(), IsFalse) tk.MustExec("insert t values ()") - c.Assert(tk.Se.Txn().Valid(), IsTrue) + c.Assert(tk.Se.Txn(true).Valid(), IsTrue) tk.MustExec("commit") - c.Assert(tk.Se.Txn().Valid(), IsFalse) + c.Assert(tk.Se.Txn(true).Valid(), IsFalse) tk.MustExec("set autocommit=1") tk.MustExec("drop table if exists t") tk.MustExec("create table t (id BIGINT PRIMARY KEY AUTO_INCREMENT NOT NULL)") tk.MustExec("begin") - c.Assert(tk.Se.Txn().Valid(), IsTrue) + c.Assert(tk.Se.Txn(true).Valid(), IsTrue) tk.MustExec("insert t values ()") - c.Assert(tk.Se.Txn().Valid(), IsTrue) + c.Assert(tk.Se.Txn(true).Valid(), IsTrue) tk.MustExec("rollback") - c.Assert(tk.Se.Txn().Valid(), IsFalse) + c.Assert(tk.Se.Txn(true).Valid(), IsFalse) } func (s *testSessionSuite) TestRetryPreparedStmt(c *C) { @@ -582,7 +578,7 @@ func (s *testSessionSuite) TestRetryPreparedStmt(c *C) { tk2 := testkit.NewTestKitWithInit(c, s.store) tk.MustExec("drop table if exists t") - c.Assert(tk.Se.Txn().Valid(), IsFalse) + c.Assert(tk.Se.Txn(true).Valid(), IsFalse) tk.MustExec("create table t (c1 int, c2 int, c3 int)") tk.MustExec("insert t values (11, 2, 3)") diff --git a/session/txn.go b/session/txn.go index b57059f9892cb..a0c364931b273 100644 --- a/session/txn.go +++ b/session/txn.go @@ -30,7 +30,7 @@ import ( // TxnState wraps kv.Transaction to provide a new kv.Transaction. // 1. It holds all statement related modification in the buffer before flush to the txn, // so if execute statement meets error, the txn won't be made dirty. -// 2. It's a lazy transaction, that means it's a txnFuture befort StartTS() is really need. +// 2. It's a lazy transaction, that means it's a txnFuture before StartTS() is really need. type TxnState struct { // States of a TxnState should be one of the followings: // Invalid: kv.Transaction == nil && txnFuture == nil diff --git a/sessionctx/context.go b/sessionctx/context.go index f8b18a48e8df9..c3e95c92f45ad 100644 --- a/sessionctx/context.go +++ b/sessionctx/context.go @@ -33,8 +33,10 @@ type Context interface { NewTxn() error // Txn returns the current transaction which is created before executing a statement. - // The returned kv.Transaction is not nil, but maybe pending or invalid. - Txn(...bool) kv.Transaction + // The returned kv.Transaction is not nil, but it maybe pending or invalid. + // If the active parameter is true, call this function will wait for the pending txn + // to become valid. + Txn(active bool) kv.Transaction // GetClient gets a kv.Client. GetClient() kv.Client @@ -57,10 +59,6 @@ type Context interface { // now just for load data and batch insert. RefreshTxnCtx(context.Context) error - // ActivePendingTxn receives the pending transaction from the transaction channel. - // It should be called right before we builds an executor. - ActivePendingTxn() error - // InitTxnWithStartTS initializes a transaction with startTS. // It should be called right before we builds an executor. InitTxnWithStartTS(startTS uint64) error diff --git a/statistics/ddl.go b/statistics/ddl.go index cb5ea99becc4d..9c1916c902930 100644 --- a/statistics/ddl.go +++ b/statistics/ddl.go @@ -54,18 +54,18 @@ func (h *Handle) insertTableStats2KV(info *model.TableInfo) error { if err != nil { return errors.Trace(err) } - _, err = exec.Execute(context.Background(), fmt.Sprintf("insert into mysql.stats_meta (version, table_id) values(%d, %d)", h.ctx.Txn().StartTS(), info.ID)) + _, err = exec.Execute(context.Background(), fmt.Sprintf("insert into mysql.stats_meta (version, table_id) values(%d, %d)", h.ctx.Txn(true).StartTS(), info.ID)) if err != nil { return errors.Trace(err) } for _, col := range info.Columns { - _, err = exec.Execute(context.Background(), fmt.Sprintf("insert into mysql.stats_histograms (table_id, is_index, hist_id, distinct_count, version) values(%d, 0, %d, 0, %d)", info.ID, col.ID, h.ctx.Txn().StartTS())) + _, err = exec.Execute(context.Background(), fmt.Sprintf("insert into mysql.stats_histograms (table_id, is_index, hist_id, distinct_count, version) values(%d, 0, %d, 0, %d)", info.ID, col.ID, h.ctx.Txn(true).StartTS())) if err != nil { return errors.Trace(err) } } for _, idx := range info.Indices { - _, err = exec.Execute(context.Background(), fmt.Sprintf("insert into mysql.stats_histograms (table_id, is_index, hist_id, distinct_count, version) values(%d, 1, %d, 0, %d)", info.ID, idx.ID, h.ctx.Txn().StartTS())) + _, err = exec.Execute(context.Background(), fmt.Sprintf("insert into mysql.stats_histograms (table_id, is_index, hist_id, distinct_count, version) values(%d, 1, %d, 0, %d)", info.ID, idx.ID, h.ctx.Txn(true).StartTS())) if err != nil { return errors.Trace(err) } @@ -83,7 +83,7 @@ func (h *Handle) insertColStats2KV(tableID int64, colInfo *model.ColumnInfo) err return errors.Trace(err) } // First of all, we update the version. - _, err = exec.Execute(context.Background(), fmt.Sprintf("update mysql.stats_meta set version = %d where table_id = %d ", h.ctx.Txn().StartTS(), tableID)) + _, err = exec.Execute(context.Background(), fmt.Sprintf("update mysql.stats_meta set version = %d where table_id = %d ", h.ctx.Txn(true).StartTS(), tableID)) if err != nil { return errors.Trace(err) } @@ -113,13 +113,13 @@ func (h *Handle) insertColStats2KV(tableID int64, colInfo *model.ColumnInfo) err } if value.IsNull() { // If the adding column has default value null, all the existing rows have null value on the newly added column. - _, err = exec.Execute(ctx, fmt.Sprintf("insert into mysql.stats_histograms (version, table_id, is_index, hist_id, distinct_count, null_count) values (%d, %d, 0, %d, 0, %d)", h.ctx.Txn().StartTS(), tableID, colInfo.ID, count)) + _, err = exec.Execute(ctx, fmt.Sprintf("insert into mysql.stats_histograms (version, table_id, is_index, hist_id, distinct_count, null_count) values (%d, %d, 0, %d, 0, %d)", h.ctx.Txn(true).StartTS(), tableID, colInfo.ID, count)) if err != nil { return errors.Trace(err) } } else { // If this stats exists, we insert histogram meta first, the distinct_count will always be one. - _, err = exec.Execute(ctx, fmt.Sprintf("insert into mysql.stats_histograms (version, table_id, is_index, hist_id, distinct_count, tot_col_size) values (%d, %d, 0, %d, 1, %d)", h.ctx.Txn().StartTS(), tableID, colInfo.ID, int64(len(value.GetBytes()))*count)) + _, err = exec.Execute(ctx, fmt.Sprintf("insert into mysql.stats_histograms (version, table_id, is_index, hist_id, distinct_count, tot_col_size) values (%d, %d, 0, %d, 1, %d)", h.ctx.Txn(true).StartTS(), tableID, colInfo.ID, int64(len(value.GetBytes()))*count)) if err != nil { return errors.Trace(err) } diff --git a/statistics/gc.go b/statistics/gc.go index 0d30b1739c634..fea6a47b5beff 100644 --- a/statistics/gc.go +++ b/statistics/gc.go @@ -106,7 +106,7 @@ func (h *Handle) deleteHistStatsFromKV(tableID int64, histID int64, isIndex int) return errors.Trace(err) } // First of all, we update the version. If this table doesn't exist, it won't have any problem. Because we cannot delete anything. - _, err = exec.Execute(context.Background(), fmt.Sprintf("update mysql.stats_meta set version = %d where table_id = %d ", h.ctx.Txn().StartTS(), tableID)) + _, err = exec.Execute(context.Background(), fmt.Sprintf("update mysql.stats_meta set version = %d where table_id = %d ", h.ctx.Txn(true).StartTS(), tableID)) if err != nil { return errors.Trace(err) } @@ -132,7 +132,7 @@ func (h *Handle) DeleteTableStatsFromKV(id int64) error { return errors.Trace(err) } // We only update the version so that other tidb will know that this table is deleted. - sql := fmt.Sprintf("update mysql.stats_meta set version = %d where table_id = %d ", h.ctx.Txn().StartTS(), id) + sql := fmt.Sprintf("update mysql.stats_meta set version = %d where table_id = %d ", h.ctx.Txn(true).StartTS(), id) _, err = exec.Execute(context.Background(), sql) if err != nil { return errors.Trace(err) diff --git a/statistics/histogram.go b/statistics/histogram.go index 819ee6d695c64..e4bc8718cdd9c 100644 --- a/statistics/histogram.go +++ b/statistics/histogram.go @@ -193,7 +193,7 @@ func SaveStatsToStorage(sctx sessionctx.Context, tableID int64, count int64, isI if err != nil { return errors.Trace(err) } - txn := sctx.Txn() + txn := sctx.Txn(true) version := txn.StartTS() var sql string // If the count is less than 0, then we do not want to update the modify count and count. diff --git a/statistics/update.go b/statistics/update.go index 05c49940a4dbd..7a1e196f0301c 100644 --- a/statistics/update.go +++ b/statistics/update.go @@ -200,9 +200,9 @@ func (h *Handle) dumpTableStatCountToKV(id int64, delta variable.TableDelta) (bo } var sql string if delta.Delta < 0 { - sql = fmt.Sprintf("update mysql.stats_meta set version = %d, count = count - %d, modify_count = modify_count + %d where table_id = %d and count >= %d", h.ctx.Txn().StartTS(), -delta.Delta, delta.Count, id, -delta.Delta) + sql = fmt.Sprintf("update mysql.stats_meta set version = %d, count = count - %d, modify_count = modify_count + %d where table_id = %d and count >= %d", h.ctx.Txn(true).StartTS(), -delta.Delta, delta.Count, id, -delta.Delta) } else { - sql = fmt.Sprintf("update mysql.stats_meta set version = %d, count = count + %d, modify_count = modify_count + %d where table_id = %d", h.ctx.Txn().StartTS(), delta.Delta, delta.Count, id) + sql = fmt.Sprintf("update mysql.stats_meta set version = %d, count = count + %d, modify_count = modify_count + %d where table_id = %d", h.ctx.Txn(true).StartTS(), delta.Delta, delta.Count, id) } _, err = h.ctx.(sqlexec.SQLExecutor).Execute(ctx, sql) if err != nil { @@ -222,7 +222,7 @@ func (h *Handle) dumpTableStatColSizeToKV(id int64, delta variable.TableDelta) e if err != nil { return errors.Trace(err) } - version := h.ctx.Txn().StartTS() + version := h.ctx.Txn(true).StartTS() values := make([]string, 0, len(delta.ColSize)) for histID, deltaColSize := range delta.ColSize { if deltaColSize == 0 { diff --git a/table/tables/tables.go b/table/tables/tables.go index b9a17b97aa3fc..09e4cece7ab84 100644 --- a/table/tables/tables.go +++ b/table/tables/tables.go @@ -225,7 +225,7 @@ func (t *Table) FirstKey() kv.Key { // `touched` means which columns are really modified, used for secondary indices. // Length of `oldData` and `newData` equals to length of `t.WritableCols()`. func (t *Table) UpdateRecord(ctx sessionctx.Context, h int64, oldData, newData []types.Datum, touched []bool) error { - txn := ctx.Txn() + txn := ctx.Txn(true) // TODO: reuse bs, like AddRecord does. bs := kv.NewBufferStore(txn, kv.DefaultTxnMembufCap) @@ -341,12 +341,12 @@ func adjustRowValuesBuf(writeBufs *variable.WriteStmtBufs, rowLen int) { // Just add the kv to transaction's membuf directly. func (t *Table) getRollbackableMemStore(ctx sessionctx.Context) kv.RetrieverMutator { if ctx.GetSessionVars().ImportingData { - return ctx.Txn() + return ctx.Txn(true) } bs := ctx.GetSessionVars().GetWriteStmtBufs().BufStore if bs == nil { - bs = kv.NewBufferStore(ctx.Txn(), kv.DefaultTxnMembufCap) + bs = kv.NewBufferStore(ctx.Txn(true), kv.DefaultTxnMembufCap) } else { bs.Reset() } @@ -377,7 +377,7 @@ func (t *Table) AddRecord(ctx sessionctx.Context, r []types.Datum, skipHandleChe } } - txn := ctx.Txn() + txn := ctx.Txn(true) sessVars := ctx.GetSessionVars() // when ImportingData or BatchCheck is true, // no needs to check the key constrains, so we names the variable skipCheck. @@ -471,7 +471,7 @@ func (t *Table) genIndexKeyStr(colVals []types.Datum) (string, error) { // addIndices adds data into indices. If any key is duplicated, returns the original handle. func (t *Table) addIndices(ctx sessionctx.Context, recordID int64, r []types.Datum, rm kv.RetrieverMutator, skipHandleCheck bool) (int64, error) { - txn := ctx.Txn() + txn := ctx.Txn(true) // Clean up lazy check error environment defer txn.DelOption(kv.PresumeKeyNotExistsError) skipCheck := ctx.GetSessionVars().ImportingData || ctx.GetSessionVars().StmtCtx.BatchCheck @@ -515,7 +515,7 @@ func (t *Table) addIndices(ctx sessionctx.Context, recordID int64, r []types.Dat func (t *Table) RowWithCols(ctx sessionctx.Context, h int64, cols []*table.Column) ([]types.Datum, error) { // Get raw row data from kv. key := t.RecordKey(h) - value, err := ctx.Txn().Get(key) + value, err := ctx.Txn(true).Get(key) if err != nil { return nil, errors.Trace(err) } @@ -654,7 +654,7 @@ func (t *Table) addDeleteBinlog(ctx sessionctx.Context, r []types.Datum, colIDs func (t *Table) removeRowData(ctx sessionctx.Context, h int64) error { // Remove row data. - err := ctx.Txn().Delete([]byte(t.RecordKey(h))) + err := ctx.Txn(true).Delete([]byte(t.RecordKey(h))) if err != nil { return errors.Trace(err) } @@ -666,14 +666,14 @@ func (t *Table) removeRowIndices(ctx sessionctx.Context, h int64, rec []types.Da for _, v := range t.DeletableIndices() { vals, err := v.FetchValues(rec, nil) if err != nil { - log.Infof("remove row index %v failed %v, txn %d, handle %d, data %v", v.Meta(), err, ctx.Txn().StartTS(), h, rec) + log.Infof("remove row index %v failed %v, txn %d, handle %d, data %v", v.Meta(), err, ctx.Txn(true).StartTS(), h, rec) return errors.Trace(err) } - if err = v.Delete(ctx.GetSessionVars().StmtCtx, ctx.Txn(), vals, h); err != nil { + if err = v.Delete(ctx.GetSessionVars().StmtCtx, ctx.Txn(true), vals, h); err != nil { if v.Meta().State != model.StatePublic && kv.ErrNotExist.Equal(err) { // If the index is not in public state, we may have not created the index, // or already deleted the index, so skip ErrNotExist error. - log.Debugf("remove row index %v doesn't exist, txn %d, handle %d", v.Meta(), ctx.Txn().StartTS(), h) + log.Debugf("remove row index %v doesn't exist, txn %d, handle %d", v.Meta(), ctx.Txn(true).StartTS(), h) continue } return errors.Trace(err) @@ -702,7 +702,7 @@ func (t *Table) buildIndexForRow(ctx sessionctx.Context, rm kv.RetrieverMutator, func (t *Table) IterRecords(ctx sessionctx.Context, startKey kv.Key, cols []*table.Column, fn table.RecordIterFunc) error { prefix := t.RecordPrefix() - it, err := ctx.Txn().Iter(startKey, prefix.PrefixNext()) + it, err := ctx.Txn(true).Iter(startKey, prefix.PrefixNext()) if err != nil { return errors.Trace(err) } @@ -831,7 +831,7 @@ func (t *Table) RebaseAutoID(ctx sessionctx.Context, newBase int64, isSetStep bo // Seek implements table.Table Seek interface. func (t *Table) Seek(ctx sessionctx.Context, h int64) (int64, bool, error) { seekKey := tablecodec.EncodeRowKeyWithHandle(t.ID, h) - iter, err := ctx.Txn().Iter(seekKey, t.RecordPrefix().PrefixNext()) + iter, err := ctx.Txn(true).Iter(seekKey, t.RecordPrefix().PrefixNext()) if !iter.Valid() || !iter.Key().HasPrefix(t.RecordPrefix()) { // No more records in the table, skip to the end. return 0, false, nil @@ -910,7 +910,7 @@ func FindIndexByColName(t table.Table, name string) table.Index { // CheckHandleExists check whether recordID key exists. if not exists, return nil, // otherwise return kv.ErrKeyExists error. func CheckHandleExists(ctx sessionctx.Context, t table.Table, recordID int64) error { - txn := ctx.Txn() + txn := ctx.Txn(true) // Check key exists. recordKey := t.RecordKey(recordID) e := kv.ErrKeyExists.FastGen("Duplicate entry '%d' for key 'PRIMARY'", recordID) diff --git a/table/tables/tables_test.go b/table/tables/tables_test.go index a056d7eec50ed..a2edcad98bff3 100644 --- a/table/tables/tables_test.go +++ b/table/tables/tables_test.go @@ -139,7 +139,7 @@ func (ts *testSuite) TestBasic(c *C) { func countEntriesWithPrefix(ctx sessionctx.Context, prefix []byte) (int, error) { cnt := 0 - err := util.ScanMetaWithPrefix(ctx.Txn(), prefix, func(k kv.Key, v []byte) bool { + err := util.ScanMetaWithPrefix(ctx.Txn(true), prefix, func(k kv.Key, v []byte) bool { cnt++ return true }) @@ -222,7 +222,7 @@ func (ts *testSuite) TestUniqueIndexMultipleNullEntries(c *C) { c.Assert(err, IsNil) _, err = tb.AddRecord(sctx, types.MakeDatums(2, nil), false) c.Assert(err, IsNil) - c.Assert(sctx.Txn().Rollback(), IsNil) + c.Assert(sctx.Txn(true).Rollback(), IsNil) _, err = ts.se.Execute(context.Background(), "drop table test.t") c.Assert(err, IsNil) } @@ -284,7 +284,7 @@ func (ts *testSuite) TestUnsignedPK(c *C) { c.Assert(err, IsNil) c.Assert(len(row), Equals, 2) c.Assert(row[0].Kind(), Equals, types.KindUint64) - c.Assert(sctx.Txn().Commit(context.Background()), IsNil) + c.Assert(sctx.Txn(true).Commit(context.Background()), IsNil) } func (ts *testSuite) TestIterRecords(c *C) { @@ -307,7 +307,7 @@ func (ts *testSuite) TestIterRecords(c *C) { }) c.Assert(err, IsNil) c.Assert(totalCount, Equals, 2) - c.Assert(sctx.Txn().Commit(context.Background()), IsNil) + c.Assert(sctx.Txn(true).Commit(context.Background()), IsNil) } func (ts *testSuite) TestTableFromMeta(c *C) { diff --git a/util/admin/admin_test.go b/util/admin/admin_test.go index 8e43a33740203..8cd441b0b2aad 100644 --- a/util/admin/admin_test.go +++ b/util/admin/admin_test.go @@ -228,7 +228,7 @@ func (s *testSuite) TestScan(c *C) { c.Assert(s.ctx.NewTxn(), IsNil) _, err = tb.AddRecord(s.ctx, types.MakeDatums(1, 10, 11), false) c.Assert(err, IsNil) - c.Assert(s.ctx.Txn().Commit(context.Background()), IsNil) + c.Assert(s.ctx.Txn(true).Commit(context.Background()), IsNil) record1 := &RecordData{Handle: int64(1), Values: types.MakeDatums(int64(1), int64(10), int64(11))} record2 := &RecordData{Handle: int64(2), Values: types.MakeDatums(int64(2), int64(20), int64(21))} @@ -241,7 +241,7 @@ func (s *testSuite) TestScan(c *C) { c.Assert(s.ctx.NewTxn(), IsNil) _, err = tb.AddRecord(s.ctx, record2.Values, false) c.Assert(err, IsNil) - c.Assert(s.ctx.Txn().Commit(context.Background()), IsNil) + c.Assert(s.ctx.Txn(true).Commit(context.Background()), IsNil) txn, err := s.store.Begin() c.Assert(err, IsNil) @@ -285,7 +285,7 @@ func (s *testSuite) TestScan(c *C) { c.Assert(err, IsNil) err = tb.RemoveRecord(s.ctx, 2, record2.Values) c.Assert(err, IsNil) - c.Assert(s.ctx.Txn().Commit(context.Background()), IsNil) + c.Assert(s.ctx.Txn(true).Commit(context.Background()), IsNil) } func newDiffRetError(prefix string, ra, rb *RecordData) string { diff --git a/util/kvencoder/kv_encoder.go b/util/kvencoder/kv_encoder.go index 852a81947ea29..c5c0e6b1e540d 100644 --- a/util/kvencoder/kv_encoder.go +++ b/util/kvencoder/kv_encoder.go @@ -120,7 +120,7 @@ func (e *kvEncoder) Encode(sql string, tableID int64) (kvPairs []KvPair, affecte } func (e *kvEncoder) getKvPairsInMemBuffer(tableID int64) (kvPairs []KvPair, affectedRows uint64, err error) { - txnMemBuffer := e.se.Txn().GetMemBuffer() + txnMemBuffer := e.se.Txn(true).GetMemBuffer() kvPairs = make([]KvPair, 0, txnMemBuffer.Len()) err = kv.WalkMemBuffer(txnMemBuffer, func(k kv.Key, v []byte) error { if bytes.HasPrefix(k, tablecodec.TablePrefix()) { diff --git a/util/kvencoder/kv_encoder_test.go b/util/kvencoder/kv_encoder_test.go index 2423767de6576..5bb1e42136897 100644 --- a/util/kvencoder/kv_encoder_test.go +++ b/util/kvencoder/kv_encoder_test.go @@ -86,7 +86,7 @@ func getExpectKvPairs(tkExpect *testkit.TestKit, sql string) []KvPair { tkExpect.MustExec("begin") tkExpect.MustExec(sql) kvPairsExpect := make([]KvPair, 0) - kv.WalkMemBuffer(tkExpect.Se.Txn().GetMemBuffer(), func(k kv.Key, v []byte) error { + kv.WalkMemBuffer(tkExpect.Se.Txn(true).GetMemBuffer(), func(k kv.Key, v []byte) error { kvPairsExpect = append(kvPairsExpect, KvPair{Key: k, Val: v}) return nil }) diff --git a/util/mock/context.go b/util/mock/context.go index 41426db6f75e0..390756ab22477 100644 --- a/util/mock/context.go +++ b/util/mock/context.go @@ -74,7 +74,7 @@ func (c *Context) GetSessionVars() *variable.SessionVars { } // Txn implements sessionctx.Context Txn interface. -func (c *Context) Txn(...bool) kv.Transaction { +func (c *Context) Txn(bool) kv.Transaction { return &c.txn } @@ -135,21 +135,6 @@ func (c *Context) RefreshTxnCtx(ctx context.Context) error { return errors.Trace(c.NewTxn()) } -// ActivePendingTxn implements the sessionctx.Context interface. -func (c *Context) ActivePendingTxn() error { - if c.txn.Valid() { - return nil - } - if c.Store != nil { - txn, err := c.Store.Begin() - if err != nil { - return errors.Trace(err) - } - c.txn.Transaction = txn - } - return nil -} - // InitTxnWithStartTS implements the sessionctx.Context interface with startTS. func (c *Context) InitTxnWithStartTS(startTS uint64) error { if c.txn.Valid() {