Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: Introduce PessimisticRRTxnContextProvider for pessimistic repeatable read txn #35158

Merged
merged 21 commits into from
Jun 13, 2022
Merged
Show file tree
Hide file tree
Changes from 20 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions planner/funcdep/extract_fd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ func TestFDSet_ExtractFD(t *testing.T) {
tk.Session().GetSessionVars().PlanColumnID = 0
err = plannercore.Preprocess(tk.Session(), stmt, plannercore.WithPreprocessorReturn(&plannercore.PreprocessorReturn{InfoSchema: is}))
require.NoError(t, err)
require.NoError(t, sessiontxn.WarmUpTxn(tk.Session()))
require.NoError(t, sessiontxn.GetTxnManager(tk.Session()).AdviseWarmup())
builder, _ := plannercore.NewPlanBuilder().Init(tk.Session(), is, &hint.BlockHintProcessor{})
// extract FD to every OP
p, err := builder.Build(ctx, stmt)
Expand Down Expand Up @@ -316,7 +316,7 @@ func TestFDSet_ExtractFDForApply(t *testing.T) {
tk.Session().GetSessionVars().PlanColumnID = 0
err = plannercore.Preprocess(tk.Session(), stmt, plannercore.WithPreprocessorReturn(&plannercore.PreprocessorReturn{InfoSchema: is}))
require.NoError(t, err, comment)
require.NoError(t, sessiontxn.WarmUpTxn(tk.Session()))
require.NoError(t, sessiontxn.GetTxnManager(tk.Session()).AdviseWarmup())
builder, _ := plannercore.NewPlanBuilder().Init(tk.Session(), is, &hint.BlockHintProcessor{})
// extract FD to every OP
p, err := builder.Build(ctx, stmt)
Expand Down Expand Up @@ -365,7 +365,7 @@ func TestFDSet_MakeOuterJoin(t *testing.T) {
tk.Session().GetSessionVars().PlanColumnID = 0
err = plannercore.Preprocess(tk.Session(), stmt, plannercore.WithPreprocessorReturn(&plannercore.PreprocessorReturn{InfoSchema: is}))
require.NoError(t, err, comment)
require.NoError(t, sessiontxn.WarmUpTxn(tk.Session()))
require.NoError(t, sessiontxn.GetTxnManager(tk.Session()).AdviseWarmup())
builder, _ := plannercore.NewPlanBuilder().Init(tk.Session(), is, &hint.BlockHintProcessor{})
// extract FD to every OP
p, err := builder.Build(ctx, stmt)
Expand Down
5 changes: 3 additions & 2 deletions planner/optimize.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ func Optimize(ctx context.Context, sctx sessionctx.Context, node ast.Node, is in
}
}

txnManger := sessiontxn.GetTxnManager(sctx)
if _, isolationReadContainTiKV := sessVars.IsolationReadEngines[kv.TiKV]; isolationReadContainTiKV {
var fp plannercore.Plan
if fpv, ok := sctx.Value(plannercore.PointPlanKey).(plannercore.PointPlanVal); ok {
Expand All @@ -124,14 +125,14 @@ func Optimize(ctx context.Context, sctx sessionctx.Context, node ast.Node, is in
}
if fp != nil {
if !useMaxTS(sctx, fp) {
if err := sessiontxn.WarmUpTxn(sctx); err != nil {
if err := txnManger.AdviseWarmup(); err != nil {
return nil, nil, err
}
}
return fp, fp.OutputNames(), nil
}
}
if err := sessiontxn.WarmUpTxn(sctx); err != nil {
if err := txnManger.AdviseWarmup(); err != nil {
return nil, nil, err
}

Expand Down
11 changes: 8 additions & 3 deletions session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -1925,6 +1925,11 @@ func (s *session) ExecuteStmt(ctx context.Context, stmtNode ast.StmtNode) (sqlex
}
return nil, err
}

if err = sessiontxn.GetTxnManager(s).AdviseOptimizeWithPlan(stmt.Plan); err != nil {
return nil, err
}

durCompile := time.Since(s.sessionVars.StartTime)
s.GetSessionVars().DurationCompile = durCompile
if s.isInternal() {
Expand Down Expand Up @@ -2181,7 +2186,7 @@ func (s *session) PrepareStmt(sql string) (stmtID uint32, paramCount int, fields
return
}

if err = sessiontxn.WarmUpTxn(s); err != nil {
if err = sessiontxn.GetTxnManager(s).AdviseWarmup(); err != nil {
return
}
prepareExec := executor.NewPrepareExec(s, sql)
Expand Down Expand Up @@ -2290,14 +2295,14 @@ func (s *session) cachedPointPlanExec(ctx context.Context,
resultSet, err = stmt.PointGet(ctx, is)
s.txn.changeToInvalid()
case *plannercore.Update:
if err = sessiontxn.WarmUpTxn(s); err == nil {
if err = sessiontxn.GetTxnManager(s).AdviseWarmup(); err == nil {
stmtCtx.Priority = kv.PriorityHigh
resultSet, err = runStmt(ctx, s, stmt)
}
case nil:
// cache is invalid
if prepareStmt.ForUpdateRead {
err = sessiontxn.WarmUpTxn(s)
err = sessiontxn.GetTxnManager(s).AdviseWarmup()
}

if err == nil {
Expand Down
18 changes: 16 additions & 2 deletions session/txnmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,9 +114,17 @@ func (m *txnManager) OnStmtRetry(ctx context.Context) error {
return m.ctxProvider.OnStmtRetry(ctx)
}

func (m *txnManager) Advise(tp sessiontxn.AdviceType) error {
func (m *txnManager) AdviseWarmup() error {
if m.ctxProvider != nil {
return m.ctxProvider.Advise(tp)
return m.ctxProvider.AdviseWarmup()
}
return nil
}

// AdviseOptimizeWithPlan providers optimization according to the plan
func (m *txnManager) AdviseOptimizeWithPlan(plan interface{}) error {
if m.ctxProvider != nil {
return m.ctxProvider.AdviseOptimizeWithPlan(plan)
}
return nil
}
Expand All @@ -139,6 +147,12 @@ func (m *txnManager) newProviderWithRequest(r *sessiontxn.EnterNewTxnRequest) se
switch m.sctx.GetSessionVars().IsolationLevelForNewTxn() {
case ast.ReadCommitted:
return isolation.NewPessimisticRCTxnContextProvider(m.sctx, r.CausalConsistencyOnly)
case ast.Serializable:
// todo: Add pessimistic serializable transaction context provider
break
default:
// We use Repeatable read for all other cases.
return isolation.NewPessimisticRRTxnContextProvider(m.sctx, r.CausalConsistencyOnly)
}
}

Expand Down
22 changes: 12 additions & 10 deletions sessiontxn/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,10 +100,21 @@ const (
// AdviceWarmUp indicates to warm up the provider's inner state.
// For example, the provider can prefetch tso when it is advised.
AdviceWarmUp AdviceType = iota
// AdviceOptimizeWithPlan indicates to do some optimizations with the given plan
AdviceOptimizeWithPlan
)

// TxnAdvisable providers a collection of optimizations within transaction
type TxnAdvisable interface {
// AdviseWarmup provides warmup for inner state
AdviseWarmup() error
// AdviseOptimizeWithPlan providers optimization according to the plan
AdviseOptimizeWithPlan(plan interface{}) error
}

// TxnContextProvider provides txn context
type TxnContextProvider interface {
TxnAdvisable
// GetTxnInfoSchema returns the information schema used by txn
GetTxnInfoSchema() infoschema.InfoSchema
// GetStmtReadTS returns the read timestamp used by select statement (not for select ... for update)
Expand All @@ -119,12 +130,11 @@ type TxnContextProvider interface {
OnStmtErrorForNextAction(point StmtErrorHandlePoint, err error) (StmtErrorAction, error)
// OnStmtRetry is the hook that should be called when a statement is retried internally.
OnStmtRetry(ctx context.Context) error
// Advise is used to give advice to provider
Advise(tp AdviceType) error
}

// TxnManager is an interface providing txn context management in session
type TxnManager interface {
TxnAdvisable
// GetTxnInfoSchema returns the information schema used by txn
GetTxnInfoSchema() infoschema.InfoSchema
// GetStmtReadTS returns the read timestamp used by select statement (not for select ... for update)
Expand All @@ -145,9 +155,6 @@ type TxnManager interface {
OnStmtErrorForNextAction(point StmtErrorHandlePoint, err error) (StmtErrorAction, error)
// OnStmtRetry is the hook that should be called when a statement retry
OnStmtRetry(ctx context.Context) error
// Advise is used to give advice to provider.
// For example, `AdviceWarmUp` can tell the provider to warm up its inner state.
Advise(tp AdviceType) error
}

// NewTxn starts a new optimistic and active txn, it can be used for the below scenes:
Expand All @@ -171,10 +178,5 @@ func NewTxnInStmt(ctx context.Context, sctx sessionctx.Context) error {
return GetTxnManager(sctx).OnStmtStart(ctx)
}

// WarmUpTxn gives the `AdviceWarmUp` advise to the provider
func WarmUpTxn(sctx sessionctx.Context) error {
return GetTxnManager(sctx).Advise(AdviceWarmUp)
}

// GetTxnManager returns the TxnManager object from session context
var GetTxnManager func(sctx sessionctx.Context) TxnManager
27 changes: 17 additions & 10 deletions sessiontxn/isolation/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/sessiontxn"
"github.com/pingcap/tidb/sessiontxn/staleread"
"github.com/pingcap/tidb/table/temptable"
)

Expand Down Expand Up @@ -118,14 +119,6 @@ func (p *baseTxnContextProvider) GetStmtForUpdateTS() (uint64, error) {
return p.getStmtForUpdateTSFunc()
}

func (p *baseTxnContextProvider) Advise(tp sessiontxn.AdviceType) error {
switch tp {
case sessiontxn.AdviceWarmUp:
return p.warmUp()
}
return nil
}

func (p *baseTxnContextProvider) OnStmtStart(ctx context.Context) error {
p.ctx = ctx
return nil
Expand Down Expand Up @@ -197,9 +190,23 @@ func (p *baseTxnContextProvider) isTidbSnapshotEnabled() bool {
return p.sctx.GetSessionVars().SnapshotTS != 0
}

func (p *baseTxnContextProvider) warmUp() error {
if p.isTidbSnapshotEnabled() {
// isBeginStmtWithStaleRead indicates whether the current statement is `BeginStmt` type with stale read
// Because stale read will use `staleread.StalenessTxnContextProvider` for query, so if `staleread.IsStmtStaleness()`
// returns true in other providers, it means the current statement is `BeginStmt` with stale read
func (p *baseTxnContextProvider) isBeginStmtWithStaleRead() bool {
return staleread.IsStmtStaleness(p.sctx)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where's the logic checking isBeiginStmt, the function name is a bit confusing.

}

// AdviseWarmup provides warmup for inner state
func (p *baseTxnContextProvider) AdviseWarmup() error {
if p.isBeginStmtWithStaleRead() {
// When executing `START TRANSACTION READ ONLY AS OF ...` no need to warmUp
return nil
}
return p.prepareTxn()
}

// AdviseOptimizeWithPlan providers optimization according to the plan
func (p *baseTxnContextProvider) AdviseOptimizeWithPlan(_ interface{}) error {
return nil
}
35 changes: 13 additions & 22 deletions sessiontxn/isolation/readcommitted.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,28 +107,6 @@ func (p *PessimisticRCTxnContextProvider) OnStmtRetry(ctx context.Context) error
return p.prepareStmt(false)
}

// Advise is used to give advice to provider
func (p *PessimisticRCTxnContextProvider) Advise(tp sessiontxn.AdviceType) error {
switch tp {
case sessiontxn.AdviceWarmUp:
return p.warmUp()
default:
return p.baseTxnContextProvider.Advise(tp)
}
}

func (p *PessimisticRCTxnContextProvider) warmUp() error {
if p.isTidbSnapshotEnabled() {
return nil
}

if err := p.prepareTxn(); err != nil {
return err
}
p.prepareStmtTS()
return nil
}

func (p *PessimisticRCTxnContextProvider) prepareStmtTS() {
if p.stmtTSFuture != nil {
return
Expand Down Expand Up @@ -216,3 +194,16 @@ func (p *PessimisticRCTxnContextProvider) handleAfterPessimisticLockError(lockEr

return sessiontxn.ErrorAction(lockErr)
}

// AdviseWarmup provides warmup for inner state
func (p *PessimisticRCTxnContextProvider) AdviseWarmup() error {
if p.isTidbSnapshotEnabled() {
return nil
}

if err := p.prepareTxn(); err != nil {
return err
}
p.prepareStmtTS()
return nil
}
16 changes: 15 additions & 1 deletion sessiontxn/isolation/readcommitted_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (

"github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/executor"
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/kv"
Expand Down Expand Up @@ -274,9 +275,22 @@ func TestRCProviderInitialize(t *testing.T) {
assertAfterActive.Check(t)
require.Equal(t, ts, se.GetSessionVars().TxnCtx.StartTS)
tk.MustExec("rollback")

// Case Pessimistic Autocommit
config.GetGlobalConfig().PessimisticTxn.PessimisticAutoCommit.Store(true)
assert = inActiveRCTxnAssert(se)
assertAfterActive = activeRCTxnAssert(t, se, true)
require.NoError(t, se.PrepareTxnCtx(context.TODO()))
provider = assert.CheckAndGetProvider(t)
require.NoError(t, provider.OnStmtStart(context.TODO()))
ts, err = provider.GetStmtReadTS()
require.NoError(t, err)
assertAfterActive.Check(t)
require.Equal(t, ts, se.GetSessionVars().TxnCtx.StartTS)
tk.MustExec("rollback")
}

func TestTidbSnapshotVar(t *testing.T) {
func TestTidbSnapshotVarInRC(t *testing.T) {
store, dom, clean := testkit.CreateMockStoreAndDomain(t)
defer clean()

Expand Down
Loading