Skip to content

Commit

Permalink
*: Introduce PessimisticRRTxnContextProvider for pessimistic repeatab…
Browse files Browse the repository at this point in the history
…le read txn (#35158)

close #35129
  • Loading branch information
SpadeA-Tang authored Jun 13, 2022
1 parent 06737ec commit 738f681
Show file tree
Hide file tree
Showing 14 changed files with 791 additions and 65 deletions.
6 changes: 3 additions & 3 deletions planner/funcdep/extract_fd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ func TestFDSet_ExtractFD(t *testing.T) {
tk.Session().GetSessionVars().PlanColumnID = 0
err = plannercore.Preprocess(tk.Session(), stmt, plannercore.WithPreprocessorReturn(&plannercore.PreprocessorReturn{InfoSchema: is}))
require.NoError(t, err)
require.NoError(t, sessiontxn.WarmUpTxn(tk.Session()))
require.NoError(t, sessiontxn.GetTxnManager(tk.Session()).AdviseWarmup())
builder, _ := plannercore.NewPlanBuilder().Init(tk.Session(), is, &hint.BlockHintProcessor{})
// extract FD to every OP
p, err := builder.Build(ctx, stmt)
Expand Down Expand Up @@ -316,7 +316,7 @@ func TestFDSet_ExtractFDForApply(t *testing.T) {
tk.Session().GetSessionVars().PlanColumnID = 0
err = plannercore.Preprocess(tk.Session(), stmt, plannercore.WithPreprocessorReturn(&plannercore.PreprocessorReturn{InfoSchema: is}))
require.NoError(t, err, comment)
require.NoError(t, sessiontxn.WarmUpTxn(tk.Session()))
require.NoError(t, sessiontxn.GetTxnManager(tk.Session()).AdviseWarmup())
builder, _ := plannercore.NewPlanBuilder().Init(tk.Session(), is, &hint.BlockHintProcessor{})
// extract FD to every OP
p, err := builder.Build(ctx, stmt)
Expand Down Expand Up @@ -365,7 +365,7 @@ func TestFDSet_MakeOuterJoin(t *testing.T) {
tk.Session().GetSessionVars().PlanColumnID = 0
err = plannercore.Preprocess(tk.Session(), stmt, plannercore.WithPreprocessorReturn(&plannercore.PreprocessorReturn{InfoSchema: is}))
require.NoError(t, err, comment)
require.NoError(t, sessiontxn.WarmUpTxn(tk.Session()))
require.NoError(t, sessiontxn.GetTxnManager(tk.Session()).AdviseWarmup())
builder, _ := plannercore.NewPlanBuilder().Init(tk.Session(), is, &hint.BlockHintProcessor{})
// extract FD to every OP
p, err := builder.Build(ctx, stmt)
Expand Down
5 changes: 3 additions & 2 deletions planner/optimize.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ func Optimize(ctx context.Context, sctx sessionctx.Context, node ast.Node, is in
}
}

txnManger := sessiontxn.GetTxnManager(sctx)
if _, isolationReadContainTiKV := sessVars.IsolationReadEngines[kv.TiKV]; isolationReadContainTiKV {
var fp plannercore.Plan
if fpv, ok := sctx.Value(plannercore.PointPlanKey).(plannercore.PointPlanVal); ok {
Expand All @@ -124,14 +125,14 @@ func Optimize(ctx context.Context, sctx sessionctx.Context, node ast.Node, is in
}
if fp != nil {
if !useMaxTS(sctx, fp) {
if err := sessiontxn.WarmUpTxn(sctx); err != nil {
if err := txnManger.AdviseWarmup(); err != nil {
return nil, nil, err
}
}
return fp, fp.OutputNames(), nil
}
}
if err := sessiontxn.WarmUpTxn(sctx); err != nil {
if err := txnManger.AdviseWarmup(); err != nil {
return nil, nil, err
}

Expand Down
11 changes: 8 additions & 3 deletions session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -1925,6 +1925,11 @@ func (s *session) ExecuteStmt(ctx context.Context, stmtNode ast.StmtNode) (sqlex
}
return nil, err
}

if err = sessiontxn.GetTxnManager(s).AdviseOptimizeWithPlan(stmt.Plan); err != nil {
return nil, err
}

durCompile := time.Since(s.sessionVars.StartTime)
s.GetSessionVars().DurationCompile = durCompile
if s.isInternal() {
Expand Down Expand Up @@ -2181,7 +2186,7 @@ func (s *session) PrepareStmt(sql string) (stmtID uint32, paramCount int, fields
return
}

if err = sessiontxn.WarmUpTxn(s); err != nil {
if err = sessiontxn.GetTxnManager(s).AdviseWarmup(); err != nil {
return
}
prepareExec := executor.NewPrepareExec(s, sql)
Expand Down Expand Up @@ -2290,14 +2295,14 @@ func (s *session) cachedPointPlanExec(ctx context.Context,
resultSet, err = stmt.PointGet(ctx, is)
s.txn.changeToInvalid()
case *plannercore.Update:
if err = sessiontxn.WarmUpTxn(s); err == nil {
if err = sessiontxn.GetTxnManager(s).AdviseWarmup(); err == nil {
stmtCtx.Priority = kv.PriorityHigh
resultSet, err = runStmt(ctx, s, stmt)
}
case nil:
// cache is invalid
if prepareStmt.ForUpdateRead {
err = sessiontxn.WarmUpTxn(s)
err = sessiontxn.GetTxnManager(s).AdviseWarmup()
}

if err == nil {
Expand Down
18 changes: 16 additions & 2 deletions session/txnmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,9 +114,17 @@ func (m *txnManager) OnStmtRetry(ctx context.Context) error {
return m.ctxProvider.OnStmtRetry(ctx)
}

func (m *txnManager) Advise(tp sessiontxn.AdviceType) error {
func (m *txnManager) AdviseWarmup() error {
if m.ctxProvider != nil {
return m.ctxProvider.Advise(tp)
return m.ctxProvider.AdviseWarmup()
}
return nil
}

// AdviseOptimizeWithPlan providers optimization according to the plan
func (m *txnManager) AdviseOptimizeWithPlan(plan interface{}) error {
if m.ctxProvider != nil {
return m.ctxProvider.AdviseOptimizeWithPlan(plan)
}
return nil
}
Expand All @@ -139,6 +147,12 @@ func (m *txnManager) newProviderWithRequest(r *sessiontxn.EnterNewTxnRequest) se
switch m.sctx.GetSessionVars().IsolationLevelForNewTxn() {
case ast.ReadCommitted:
return isolation.NewPessimisticRCTxnContextProvider(m.sctx, r.CausalConsistencyOnly)
case ast.Serializable:
// todo: Add pessimistic serializable transaction context provider
break
default:
// We use Repeatable read for all other cases.
return isolation.NewPessimisticRRTxnContextProvider(m.sctx, r.CausalConsistencyOnly)
}
}

Expand Down
22 changes: 12 additions & 10 deletions sessiontxn/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,10 +100,21 @@ const (
// AdviceWarmUp indicates to warm up the provider's inner state.
// For example, the provider can prefetch tso when it is advised.
AdviceWarmUp AdviceType = iota
// AdviceOptimizeWithPlan indicates to do some optimizations with the given plan
AdviceOptimizeWithPlan
)

// TxnAdvisable providers a collection of optimizations within transaction
type TxnAdvisable interface {
// AdviseWarmup provides warmup for inner state
AdviseWarmup() error
// AdviseOptimizeWithPlan providers optimization according to the plan
AdviseOptimizeWithPlan(plan interface{}) error
}

// TxnContextProvider provides txn context
type TxnContextProvider interface {
TxnAdvisable
// GetTxnInfoSchema returns the information schema used by txn
GetTxnInfoSchema() infoschema.InfoSchema
// GetStmtReadTS returns the read timestamp used by select statement (not for select ... for update)
Expand All @@ -119,12 +130,11 @@ type TxnContextProvider interface {
OnStmtErrorForNextAction(point StmtErrorHandlePoint, err error) (StmtErrorAction, error)
// OnStmtRetry is the hook that should be called when a statement is retried internally.
OnStmtRetry(ctx context.Context) error
// Advise is used to give advice to provider
Advise(tp AdviceType) error
}

// TxnManager is an interface providing txn context management in session
type TxnManager interface {
TxnAdvisable
// GetTxnInfoSchema returns the information schema used by txn
GetTxnInfoSchema() infoschema.InfoSchema
// GetStmtReadTS returns the read timestamp used by select statement (not for select ... for update)
Expand All @@ -145,9 +155,6 @@ type TxnManager interface {
OnStmtErrorForNextAction(point StmtErrorHandlePoint, err error) (StmtErrorAction, error)
// OnStmtRetry is the hook that should be called when a statement retry
OnStmtRetry(ctx context.Context) error
// Advise is used to give advice to provider.
// For example, `AdviceWarmUp` can tell the provider to warm up its inner state.
Advise(tp AdviceType) error
}

// NewTxn starts a new optimistic and active txn, it can be used for the below scenes:
Expand All @@ -171,10 +178,5 @@ func NewTxnInStmt(ctx context.Context, sctx sessionctx.Context) error {
return GetTxnManager(sctx).OnStmtStart(ctx)
}

// WarmUpTxn gives the `AdviceWarmUp` advise to the provider
func WarmUpTxn(sctx sessionctx.Context) error {
return GetTxnManager(sctx).Advise(AdviceWarmUp)
}

// GetTxnManager returns the TxnManager object from session context
var GetTxnManager func(sctx sessionctx.Context) TxnManager
27 changes: 17 additions & 10 deletions sessiontxn/isolation/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/sessiontxn"
"github.com/pingcap/tidb/sessiontxn/staleread"
"github.com/pingcap/tidb/table/temptable"
)

Expand Down Expand Up @@ -118,14 +119,6 @@ func (p *baseTxnContextProvider) GetStmtForUpdateTS() (uint64, error) {
return p.getStmtForUpdateTSFunc()
}

func (p *baseTxnContextProvider) Advise(tp sessiontxn.AdviceType) error {
switch tp {
case sessiontxn.AdviceWarmUp:
return p.warmUp()
}
return nil
}

func (p *baseTxnContextProvider) OnStmtStart(ctx context.Context) error {
p.ctx = ctx
return nil
Expand Down Expand Up @@ -197,9 +190,23 @@ func (p *baseTxnContextProvider) isTidbSnapshotEnabled() bool {
return p.sctx.GetSessionVars().SnapshotTS != 0
}

func (p *baseTxnContextProvider) warmUp() error {
if p.isTidbSnapshotEnabled() {
// isBeginStmtWithStaleRead indicates whether the current statement is `BeginStmt` type with stale read
// Because stale read will use `staleread.StalenessTxnContextProvider` for query, so if `staleread.IsStmtStaleness()`
// returns true in other providers, it means the current statement is `BeginStmt` with stale read
func (p *baseTxnContextProvider) isBeginStmtWithStaleRead() bool {
return staleread.IsStmtStaleness(p.sctx)
}

// AdviseWarmup provides warmup for inner state
func (p *baseTxnContextProvider) AdviseWarmup() error {
if p.isBeginStmtWithStaleRead() {
// When executing `START TRANSACTION READ ONLY AS OF ...` no need to warmUp
return nil
}
return p.prepareTxn()
}

// AdviseOptimizeWithPlan providers optimization according to the plan
func (p *baseTxnContextProvider) AdviseOptimizeWithPlan(_ interface{}) error {
return nil
}
35 changes: 13 additions & 22 deletions sessiontxn/isolation/readcommitted.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,28 +107,6 @@ func (p *PessimisticRCTxnContextProvider) OnStmtRetry(ctx context.Context) error
return p.prepareStmt(false)
}

// Advise is used to give advice to provider
func (p *PessimisticRCTxnContextProvider) Advise(tp sessiontxn.AdviceType) error {
switch tp {
case sessiontxn.AdviceWarmUp:
return p.warmUp()
default:
return p.baseTxnContextProvider.Advise(tp)
}
}

func (p *PessimisticRCTxnContextProvider) warmUp() error {
if p.isTidbSnapshotEnabled() {
return nil
}

if err := p.prepareTxn(); err != nil {
return err
}
p.prepareStmtTS()
return nil
}

func (p *PessimisticRCTxnContextProvider) prepareStmtTS() {
if p.stmtTSFuture != nil {
return
Expand Down Expand Up @@ -216,3 +194,16 @@ func (p *PessimisticRCTxnContextProvider) handleAfterPessimisticLockError(lockEr

return sessiontxn.ErrorAction(lockErr)
}

// AdviseWarmup provides warmup for inner state
func (p *PessimisticRCTxnContextProvider) AdviseWarmup() error {
if p.isTidbSnapshotEnabled() {
return nil
}

if err := p.prepareTxn(); err != nil {
return err
}
p.prepareStmtTS()
return nil
}
16 changes: 15 additions & 1 deletion sessiontxn/isolation/readcommitted_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (

"github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/executor"
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/kv"
Expand Down Expand Up @@ -274,9 +275,22 @@ func TestRCProviderInitialize(t *testing.T) {
assertAfterActive.Check(t)
require.Equal(t, ts, se.GetSessionVars().TxnCtx.StartTS)
tk.MustExec("rollback")

// Case Pessimistic Autocommit
config.GetGlobalConfig().PessimisticTxn.PessimisticAutoCommit.Store(true)
assert = inActiveRCTxnAssert(se)
assertAfterActive = activeRCTxnAssert(t, se, true)
require.NoError(t, se.PrepareTxnCtx(context.TODO()))
provider = assert.CheckAndGetProvider(t)
require.NoError(t, provider.OnStmtStart(context.TODO()))
ts, err = provider.GetStmtReadTS()
require.NoError(t, err)
assertAfterActive.Check(t)
require.Equal(t, ts, se.GetSessionVars().TxnCtx.StartTS)
tk.MustExec("rollback")
}

func TestTidbSnapshotVar(t *testing.T) {
func TestTidbSnapshotVarInRC(t *testing.T) {
store, dom, clean := testkit.CreateMockStoreAndDomain(t)
defer clean()

Expand Down
Loading

0 comments on commit 738f681

Please sign in to comment.