Skip to content

Commit

Permalink
session: skip the SQL execution if transaction is aborted and reset a…
Browse files Browse the repository at this point in the history
…borted status in retry (#8942)
  • Loading branch information
jackysp authored Jan 11, 2019
1 parent 30e8c8d commit 73655f6
Show file tree
Hide file tree
Showing 4 changed files with 75 additions and 10 deletions.
33 changes: 26 additions & 7 deletions session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -514,23 +514,42 @@ func (s *session) isRetryableError(err error) bool {
return kv.IsRetryableError(err) || domain.ErrInfoSchemaChanged.Equal(err)
}

func (s *session) retry(ctx context.Context, maxCnt uint) error {
connID := s.sessionVars.ConnectionID
if s.sessionVars.TxnCtx.ForUpdate {
return errForUpdateCantRetry.GenWithStackByArgs(connID)
func (s *session) checkTxnAborted(stmt sqlexec.Statement) error {
if s.txn.doNotCommit == nil {
return nil
}
s.sessionVars.RetryInfo.Retrying = true
// If the transaction is aborted, the following statements do not need to execute, except `commit` and `rollback`,
// because they are used to finish the aborted transaction.
if _, ok := stmt.(*executor.ExecStmt).StmtNode.(*ast.CommitStmt); ok {
return nil
}
if _, ok := stmt.(*executor.ExecStmt).StmtNode.(*ast.RollbackStmt); ok {
return nil
}
return errors.New("current transaction is aborted, commands ignored until end of transaction block")
}

func (s *session) retry(ctx context.Context, maxCnt uint) (err error) {
var retryCnt uint
defer func() {
s.sessionVars.RetryInfo.Retrying = false
s.txn.changeToInvalid()
// retryCnt only increments on retryable error, so +1 here.
metrics.SessionRetry.Observe(float64(retryCnt + 1))
s.sessionVars.SetStatusFlag(mysql.ServerStatusInTrans, false)
if err != nil {
s.rollbackOnError(ctx)
}
s.txn.changeToInvalid()
}()

connID := s.sessionVars.ConnectionID
s.sessionVars.RetryInfo.Retrying = true
if s.sessionVars.TxnCtx.ForUpdate {
err = errForUpdateCantRetry.GenWithStackByArgs(connID)
return err
}

nh := GetHistory(s)
var err error
var schemaVersion int64
sessVars := s.GetSessionVars()
orgStartTS := sessVars.TxnCtx.StartTS
Expand Down
41 changes: 38 additions & 3 deletions session/session_fail_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,48 @@ func (s *testSessionSuite) TestFailStatementCommit(c *C) {

gofail.Disable("github.com/pingcap/tidb/session/mockStmtCommitError")

tk.MustQuery("select * from t").Check(testkit.Rows("1"))
tk.MustExec("insert into t values (3)")
tk.MustExec("insert into t values (4)")
_, err = tk.Exec("select * from t")
c.Assert(err, NotNil)
_, err = tk.Exec("insert into t values (3)")
c.Assert(err, NotNil)
_, err = tk.Exec("insert into t values (4)")
c.Assert(err, NotNil)
_, err = tk.Exec("commit")
c.Assert(err, NotNil)

tk.MustQuery(`select * from t`).Check(testkit.Rows())

tk.MustExec("insert into t values (1)")

tk.MustExec("begin")
tk.MustExec("insert into t values (2)")
tk.MustExec("commit")

tk.MustExec("begin")
tk.MustExec("insert into t values (3)")
tk.MustExec("rollback")

tk.MustQuery(`select * from t`).Check(testkit.Rows("1", "2"))
}

func (s *testSessionSuite) TestFailStatementCommitInRetry(c *C) {
tk := testkit.NewTestKitWithInit(c, s.store)
tk.MustExec("create table t (id int)")

tk.MustExec("begin")
tk.MustExec("insert into t values (1)")
tk.MustExec("insert into t values (2),(3),(4),(5)")
tk.MustExec("insert into t values (6)")

gofail.Enable("github.com/pingcap/tidb/session/mockCommitError8942", `return(true)`)
gofail.Enable("github.com/pingcap/tidb/session/mockStmtCommitError", `return(true)`)
_, err := tk.Exec("commit")
c.Assert(err, NotNil)
gofail.Disable("github.com/pingcap/tidb/session/mockCommitError8942")
gofail.Disable("github.com/pingcap/tidb/session/mockStmtCommitError")

tk.MustExec("insert into t values (6)")
tk.MustQuery(`select * from t`).Check(testkit.Rows("6"))
}

func (s *testSessionSuite) TestGetTSFailDirtyState(c *C) {
Expand Down
4 changes: 4 additions & 0 deletions session/tidb.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,10 @@ func runStmt(ctx context.Context, sctx sessionctx.Context, s sqlexec.Statement)
var err error
var rs sqlexec.RecordSet
se := sctx.(*session)
err = se.checkTxnAborted(s)
if err != nil {
return nil, err
}
rs, err = s.Exec(ctx)
sessVars := se.GetSessionVars()
// All the history should be added here.
Expand Down
7 changes: 7 additions & 0 deletions session/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,13 @@ func (st *TxnState) Commit(ctx context.Context) error {
}
return errors.Trace(st.doNotCommit)
}

// mockCommitError8942 is used for PR #8942.
// gofail: var mockCommitError8942 bool
// if mockCommitError8942 {
// return kv.ErrRetryable
// }

return errors.Trace(st.Transaction.Commit(ctx))
}

Expand Down

0 comments on commit 73655f6

Please sign in to comment.