Skip to content

Commit

Permalink
Add interface TxnAdvisable
Browse files Browse the repository at this point in the history
Signed-off-by: SpadeA-Tang <u6748471@anu.edu.au>
  • Loading branch information
SpadeA-Tang committed Jun 12, 2022
1 parent 0a129f9 commit f2ee63f
Show file tree
Hide file tree
Showing 13 changed files with 91 additions and 91 deletions.
6 changes: 3 additions & 3 deletions planner/funcdep/extract_fd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ func TestFDSet_ExtractFD(t *testing.T) {
tk.Session().GetSessionVars().PlanColumnID = 0
err = plannercore.Preprocess(tk.Session(), stmt, plannercore.WithPreprocessorReturn(&plannercore.PreprocessorReturn{InfoSchema: is}))
require.NoError(t, err)
require.NoError(t, sessiontxn.WarmUpTxn(tk.Session()))
require.NoError(t, sessiontxn.GetTxnManager(tk.Session()).AdviseWarmup())
builder, _ := plannercore.NewPlanBuilder().Init(tk.Session(), is, &hint.BlockHintProcessor{})
// extract FD to every OP
p, err := builder.Build(ctx, stmt)
Expand Down Expand Up @@ -316,7 +316,7 @@ func TestFDSet_ExtractFDForApply(t *testing.T) {
tk.Session().GetSessionVars().PlanColumnID = 0
err = plannercore.Preprocess(tk.Session(), stmt, plannercore.WithPreprocessorReturn(&plannercore.PreprocessorReturn{InfoSchema: is}))
require.NoError(t, err, comment)
require.NoError(t, sessiontxn.WarmUpTxn(tk.Session()))
require.NoError(t, sessiontxn.GetTxnManager(tk.Session()).AdviseWarmup())
builder, _ := plannercore.NewPlanBuilder().Init(tk.Session(), is, &hint.BlockHintProcessor{})
// extract FD to every OP
p, err := builder.Build(ctx, stmt)
Expand Down Expand Up @@ -365,7 +365,7 @@ func TestFDSet_MakeOuterJoin(t *testing.T) {
tk.Session().GetSessionVars().PlanColumnID = 0
err = plannercore.Preprocess(tk.Session(), stmt, plannercore.WithPreprocessorReturn(&plannercore.PreprocessorReturn{InfoSchema: is}))
require.NoError(t, err, comment)
require.NoError(t, sessiontxn.WarmUpTxn(tk.Session()))
require.NoError(t, sessiontxn.GetTxnManager(tk.Session()).AdviseWarmup())
builder, _ := plannercore.NewPlanBuilder().Init(tk.Session(), is, &hint.BlockHintProcessor{})
// extract FD to every OP
p, err := builder.Build(ctx, stmt)
Expand Down
5 changes: 3 additions & 2 deletions planner/optimize.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ func Optimize(ctx context.Context, sctx sessionctx.Context, node ast.Node, is in
}
}

txnManger := sessiontxn.GetTxnManager(sctx)
if _, isolationReadContainTiKV := sessVars.IsolationReadEngines[kv.TiKV]; isolationReadContainTiKV {
var fp plannercore.Plan
if fpv, ok := sctx.Value(plannercore.PointPlanKey).(plannercore.PointPlanVal); ok {
Expand All @@ -124,14 +125,14 @@ func Optimize(ctx context.Context, sctx sessionctx.Context, node ast.Node, is in
}
if fp != nil {
if !useMaxTS(sctx, fp) {
if err := sessiontxn.WarmUpTxn(sctx); err != nil {
if err := txnManger.AdviseWarmup(); err != nil {
return nil, nil, err
}
}
return fp, fp.OutputNames(), nil
}
}
if err := sessiontxn.WarmUpTxn(sctx); err != nil {
if err := txnManger.AdviseWarmup(); err != nil {
return nil, nil, err
}

Expand Down
8 changes: 4 additions & 4 deletions session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -1926,7 +1926,7 @@ func (s *session) ExecuteStmt(ctx context.Context, stmtNode ast.StmtNode) (sqlex
return nil, err
}

if err = sessiontxn.OptimizeWithPlan(s, stmt.Plan); err != nil {
if err = sessiontxn.GetTxnManager(s).AdviseOptimizeWithPlan(stmt.Plan); err != nil {
return nil, err
}

Expand Down Expand Up @@ -2186,7 +2186,7 @@ func (s *session) PrepareStmt(sql string) (stmtID uint32, paramCount int, fields
return
}

if err = sessiontxn.WarmUpTxn(s); err != nil {
if err = sessiontxn.GetTxnManager(s).AdviseWarmup(); err != nil {
return
}
prepareExec := executor.NewPrepareExec(s, sql)
Expand Down Expand Up @@ -2295,14 +2295,14 @@ func (s *session) cachedPointPlanExec(ctx context.Context,
resultSet, err = stmt.PointGet(ctx, is)
s.txn.changeToInvalid()
case *plannercore.Update:
if err = sessiontxn.WarmUpTxn(s); err == nil {
if err = sessiontxn.GetTxnManager(s).AdviseWarmup(); err == nil {
stmtCtx.Priority = kv.PriorityHigh
resultSet, err = runStmt(ctx, s, stmt)
}
case nil:
// cache is invalid
if prepareStmt.ForUpdateRead {
err = sessiontxn.WarmUpTxn(s)
err = sessiontxn.GetTxnManager(s).AdviseWarmup()
}

if err == nil {
Expand Down
12 changes: 10 additions & 2 deletions session/txnmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,9 +114,17 @@ func (m *txnManager) OnStmtRetry(ctx context.Context) error {
return m.ctxProvider.OnStmtRetry(ctx)
}

func (m *txnManager) Advise(tp sessiontxn.AdviceType, val ...any) error {
func (m *txnManager) AdviseWarmup() error {
if m.ctxProvider != nil {
return m.ctxProvider.Advise(tp, val)
return m.ctxProvider.AdviseWarmup()
}
return nil
}

// AdviseOptimizeWithPlan providers optimization according to the plan
func (m *txnManager) AdviseOptimizeWithPlan(plan interface{}) error {
if m.ctxProvider != nil {
return m.ctxProvider.AdviseOptimizeWithPlan(plan)
}
return nil
}
Expand Down
25 changes: 10 additions & 15 deletions sessiontxn/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,17 @@ const (
AdviceOptimizeWithPlan
)

// TxnAdvisable providers a collection of optimizations within transaction
type TxnAdvisable interface {
// AdviseWarmup provides warmup for inner state
AdviseWarmup() error
// AdviseOptimizeWithPlan providers optimization according to the plan
AdviseOptimizeWithPlan(plan interface{}) error
}

// TxnContextProvider provides txn context
type TxnContextProvider interface {
TxnAdvisable
// GetTxnInfoSchema returns the information schema used by txn
GetTxnInfoSchema() infoschema.InfoSchema
// GetStmtReadTS returns the read timestamp used by select statement (not for select ... for update)
Expand All @@ -121,12 +130,11 @@ type TxnContextProvider interface {
OnStmtErrorForNextAction(point StmtErrorHandlePoint, err error) (StmtErrorAction, error)
// OnStmtRetry is the hook that should be called when a statement is retried internally.
OnStmtRetry(ctx context.Context) error
// Advise is used to give advice to provider
Advise(tp AdviceType, val []any) error
}

// TxnManager is an interface providing txn context management in session
type TxnManager interface {
TxnAdvisable
// GetTxnInfoSchema returns the information schema used by txn
GetTxnInfoSchema() infoschema.InfoSchema
// GetStmtReadTS returns the read timestamp used by select statement (not for select ... for update)
Expand All @@ -147,9 +155,6 @@ type TxnManager interface {
OnStmtErrorForNextAction(point StmtErrorHandlePoint, err error) (StmtErrorAction, error)
// OnStmtRetry is the hook that should be called when a statement retry
OnStmtRetry(ctx context.Context) error
// Advise is used to give advice to provider.
// For example, `AdviceWarmUp` can tell the provider to warm up its inner state.
Advise(tp AdviceType, val ...any) error
}

// NewTxn starts a new optimistic and active txn, it can be used for the below scenes:
Expand All @@ -173,15 +178,5 @@ func NewTxnInStmt(ctx context.Context, sctx sessionctx.Context) error {
return GetTxnManager(sctx).OnStmtStart(ctx)
}

// WarmUpTxn gives the `AdviceWarmUp` advise to the provider
func WarmUpTxn(sctx sessionctx.Context) error {
return GetTxnManager(sctx).Advise(AdviceWarmUp)
}

// OptimizeWithPlan gives the `AdviceOptimizeWithPlan` advise to the provider
func OptimizeWithPlan(sctx sessionctx.Context, plan any) error {
return GetTxnManager(sctx).Advise(AdviceOptimizeWithPlan, plan)
}

// GetTxnManager returns the TxnManager object from session context
var GetTxnManager func(sctx sessionctx.Context) TxnManager
16 changes: 7 additions & 9 deletions sessiontxn/isolation/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,14 +119,6 @@ func (p *baseTxnContextProvider) GetStmtForUpdateTS() (uint64, error) {
return p.getStmtForUpdateTSFunc()
}

func (p *baseTxnContextProvider) Advise(tp sessiontxn.AdviceType, _ []any) error {
switch tp {
case sessiontxn.AdviceWarmUp:
return p.warmUp()
}
return nil
}

func (p *baseTxnContextProvider) OnStmtStart(ctx context.Context) error {
p.ctx = ctx
return nil
Expand Down Expand Up @@ -205,10 +197,16 @@ func (p *baseTxnContextProvider) isBeginStmtWithStaleRead() bool {
return staleread.IsStmtStaleness(p.sctx)
}

func (p *baseTxnContextProvider) warmUp() error {
// AdviseWarmup provides warmup for inner state
func (p *baseTxnContextProvider) AdviseWarmup() error {
if p.isBeginStmtWithStaleRead() {
// When executing `START TRANSACTION READ ONLY AS OF ...` no need to warmUp
return nil
}
return p.prepareTxn()
}

// AdviseOptimizeWithPlan providers optimization according to the plan
func (p *baseTxnContextProvider) AdviseOptimizeWithPlan(_ interface{}) error {
return nil
}
35 changes: 13 additions & 22 deletions sessiontxn/isolation/readcommitted.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,28 +107,6 @@ func (p *PessimisticRCTxnContextProvider) OnStmtRetry(ctx context.Context) error
return p.prepareStmt(false)
}

// Advise is used to give advice to provider
func (p *PessimisticRCTxnContextProvider) Advise(tp sessiontxn.AdviceType, val []any) error {
switch tp {
case sessiontxn.AdviceWarmUp:
return p.warmUp()
default:
return p.baseTxnContextProvider.Advise(tp, val)
}
}

func (p *PessimisticRCTxnContextProvider) warmUp() error {
if p.isTidbSnapshotEnabled() {
return nil
}

if err := p.prepareTxn(); err != nil {
return err
}
p.prepareStmtTS()
return nil
}

func (p *PessimisticRCTxnContextProvider) prepareStmtTS() {
if p.stmtTSFuture != nil {
return
Expand Down Expand Up @@ -216,3 +194,16 @@ func (p *PessimisticRCTxnContextProvider) handleAfterPessimisticLockError(lockEr

return sessiontxn.ErrorAction(lockErr)
}

// AdviseWarmup provides warmup for inner state
func (p *PessimisticRCTxnContextProvider) AdviseWarmup() error {
if p.isTidbSnapshotEnabled() {
return nil
}

if err := p.prepareTxn(); err != nil {
return err
}
p.prepareStmtTS()
return nil
}
22 changes: 3 additions & 19 deletions sessiontxn/isolation/repeatable_read.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,33 +159,17 @@ func (p *PessimisticRRTxnContextProvider) OnStmtErrorForNextAction(point session
}
}

// Advise is used to give advice to provider
func (p *PessimisticRRTxnContextProvider) Advise(tp sessiontxn.AdviceType, val []any) error {
switch tp {
case sessiontxn.AdviceWarmUp:
return p.warmUp()
case sessiontxn.AdviceOptimizeWithPlan:
return p.optimizeWithPlan(val)
default:
return p.baseTxnContextProvider.Advise(tp, val)
}
}

// optimizeWithPlan optimizes for update point get related execution.
// AdviseOptimizeWithPlan optimizes for update point get related execution.
// Use case: In for update point get related operations, we do not fetch ts from PD but use the last ts we fetched.
// We expect that the data that the point get acquires has not been changed.
// Benefit: Save the cost of acquiring ts from PD.
// Drawbacks: If the data has been changed since the ts we used, we need to retry.
func (p *PessimisticRRTxnContextProvider) optimizeWithPlan(val []any) (err error) {
func (p *PessimisticRRTxnContextProvider) AdviseOptimizeWithPlan(val interface{}) (err error) {
if p.isTidbSnapshotEnabled() || p.isBeginStmtWithStaleRead() {
return nil
}

if len(val) == 0 {
return nil
}

plan, ok := val[0].(plannercore.Plan)
plan, ok := val.(plannercore.Plan)
if !ok {
return nil
}
Expand Down
9 changes: 5 additions & 4 deletions sessiontxn/isolation/repeatable_reat_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -340,14 +340,15 @@ func TestOptimizeWithPlanInPessimisticRR(t *testing.T) {
se := tk.Session()
provider := initializeRepeatableReadProvider(t, tk)
forUpdateTS := se.GetSessionVars().TxnCtx.GetForUpdateTS()
txnManager := sessiontxn.GetTxnManager(se)

stmt, err := parser.New().ParseOneStmt("delete from t where id = 1", "", "")
require.NoError(t, err)
compareTs := getOracleTS(t, se)
compiler := executor.Compiler{Ctx: se}
execStmt, err := compiler.Compile(context.TODO(), stmt)
require.NoError(t, err)
err = sessiontxn.OptimizeWithPlan(se, execStmt.Plan)
err = txnManager.AdviseOptimizeWithPlan(execStmt.Plan)
require.NoError(t, err)
ts, err := provider.GetStmtForUpdateTS()
require.NoError(t, err)
Expand All @@ -359,7 +360,7 @@ func TestOptimizeWithPlanInPessimisticRR(t *testing.T) {
compiler = executor.Compiler{Ctx: se}
execStmt, err = compiler.Compile(context.TODO(), stmt)
require.NoError(t, err)
err = sessiontxn.OptimizeWithPlan(se, execStmt.Plan)
err = txnManager.AdviseOptimizeWithPlan(execStmt.Plan)
require.NoError(t, err)
ts, err = provider.GetStmtForUpdateTS()
require.NoError(t, err)
Expand All @@ -370,7 +371,7 @@ func TestOptimizeWithPlanInPessimisticRR(t *testing.T) {
compiler = executor.Compiler{Ctx: se}
execStmt, err = compiler.Compile(context.TODO(), stmt)
require.NoError(t, err)
err = sessiontxn.OptimizeWithPlan(se, execStmt.Plan)
err = txnManager.AdviseOptimizeWithPlan(execStmt.Plan)
require.NoError(t, err)
ts, err = provider.GetStmtForUpdateTS()
require.NoError(t, err)
Expand All @@ -383,7 +384,7 @@ func TestOptimizeWithPlanInPessimisticRR(t *testing.T) {
compiler = executor.Compiler{Ctx: se}
execStmt, err = compiler.Compile(context.TODO(), stmt)
require.NoError(t, err)
err = sessiontxn.OptimizeWithPlan(se, execStmt.Plan)
err = txnManager.AdviseOptimizeWithPlan(execStmt.Plan)
require.NoError(t, err)
ts, err = provider.GetStmtForUpdateTS()
require.NoError(t, err)
Expand Down
28 changes: 20 additions & 8 deletions sessiontxn/legacy/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,14 +189,15 @@ func (p *SimpleTxnContextProvider) OnStmtRetry(_ context.Context) error {
return nil
}

// Advise is used to give advice to provider
func (p *SimpleTxnContextProvider) Advise(tp sessiontxn.AdviceType, _ []any) error {
switch tp {
case sessiontxn.AdviceWarmUp:
p.Sctx.PrepareTSFuture(p.Ctx)
}
return nil
}
//// todo: remove
//// Advise is used to give advice to provider
//func (p *SimpleTxnContextProvider) Advise(tp sessiontxn.AdviceType, _ []any) error {
// switch tp {
// case sessiontxn.AdviceWarmUp:
// p.Sctx.PrepareTSFuture(p.Ctx)
// }
// return nil
//}

// activeTxn actives the txn
func (p *SimpleTxnContextProvider) activeTxn() (kv.Transaction, error) {
Expand All @@ -220,3 +221,14 @@ func (p *SimpleTxnContextProvider) activeTxn() (kv.Transaction, error) {
p.isTxnActive = true
return txn, nil
}

// AdviseWarmup provides warmup for inner state
func (p *SimpleTxnContextProvider) AdviseWarmup() error {
p.Sctx.PrepareTSFuture(p.Ctx)
return nil
}

// AdviseOptimizeWithPlan providers optimization according to the plan
func (p *SimpleTxnContextProvider) AdviseOptimizeWithPlan(_ interface{}) error {
return nil
}
10 changes: 10 additions & 0 deletions sessiontxn/staleread/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,3 +104,13 @@ func (p *StalenessTxnContextProvider) OnStmtRetry(_ context.Context) error {
func (p *StalenessTxnContextProvider) Advise(_ sessiontxn.AdviceType, _ []any) error {
return nil
}

// AdviseWarmup provides warmup for inner state
func (p *StalenessTxnContextProvider) AdviseWarmup() error {
return nil
}

// AdviseOptimizeWithPlan providers optimization according to the plan
func (p *StalenessTxnContextProvider) AdviseOptimizeWithPlan(_ interface{}) error {
return nil
}
4 changes: 2 additions & 2 deletions sessiontxn/txn_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ func TestEnterNewTxn(t *testing.T) {
})
require.NoError(t, err)
require.NoError(t, mgr.OnStmtStart(context.TODO()))
require.NoError(t, mgr.Advise(sessiontxn.AdviceWarmUp))
require.NoError(t, mgr.AdviseWarmup())
},
request: &sessiontxn.EnterNewTxnRequest{
Type: sessiontxn.EnterNewTxnWithBeginStmt,
Expand Down Expand Up @@ -270,7 +270,7 @@ func checkTxnBeforeStmt(t *testing.T, sctx sessionctx.Context) {
}

func checkStmtTxnAfterActive(t *testing.T, sctx sessionctx.Context) {
require.NoError(t, sessiontxn.WarmUpTxn(sctx))
require.NoError(t, sessiontxn.GetTxnManager(sctx).AdviseWarmup())
_, err := sctx.Txn(true)
require.NoError(t, err)
txn := checkBasicActiveTxn(t, sctx)
Expand Down
Loading

0 comments on commit f2ee63f

Please sign in to comment.