Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: Introduce PessimisticRRTxnContextProvider for pessimistic repeatable read txn #35158

Merged
merged 21 commits into from
Jun 13, 2022
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions planner/optimize.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ package planner
import (
"context"
"fmt"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/sessiontxn"
"math"
"math/rand"
"runtime/trace"
Expand All @@ -33,15 +35,13 @@ import (
"github.com/pingcap/tidb/metrics"
"github.com/pingcap/tidb/parser"
"github.com/pingcap/tidb/parser/ast"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/planner/cascades"
"github.com/pingcap/tidb/planner/core"
plannercore "github.com/pingcap/tidb/planner/core"
"github.com/pingcap/tidb/privilege"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/stmtctx"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/sessiontxn"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/hint"
"github.com/pingcap/tidb/util/logutil"
Expand Down
5 changes: 5 additions & 0 deletions session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -1924,6 +1924,11 @@ func (s *session) ExecuteStmt(ctx context.Context, stmtNode ast.StmtNode) (sqlex
}
return nil, err
}

if err = sessiontxn.OptimizeWithPlan(s, stmt.Plan); err != nil {
return nil, err
}

durCompile := time.Since(s.sessionVars.StartTime)
s.GetSessionVars().DurationCompile = durCompile
if s.isInternal() {
Expand Down
10 changes: 8 additions & 2 deletions session/txnmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,9 +114,9 @@ func (m *txnManager) OnStmtRetry(ctx context.Context) error {
return m.ctxProvider.OnStmtRetry(ctx)
}

func (m *txnManager) Advise(tp sessiontxn.AdviceType) error {
func (m *txnManager) Advise(tp sessiontxn.AdviceType, val ...any) error {
if m.ctxProvider != nil {
return m.ctxProvider.Advise(tp)
return m.ctxProvider.Advise(tp, val)
}
return nil
}
Expand All @@ -139,6 +139,12 @@ func (m *txnManager) newProviderWithRequest(r *sessiontxn.EnterNewTxnRequest) se
switch m.sctx.GetSessionVars().IsolationLevelForNewTxn() {
case ast.ReadCommitted:
return isolation.NewPessimisticRCTxnContextProvider(m.sctx, r.CausalConsistencyOnly)
case ast.Serializable:
// todo: Add pessimistic serializable transaction context provider
break
default:
// We use Repeatable read for all other cases.
return isolation.NewPessimisticRRTxnContextProvider(m.sctx, r.CausalConsistencyOnly)
}
}

Expand Down
11 changes: 9 additions & 2 deletions sessiontxn/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,8 @@ const (
// AdviceWarmUp indicates to warm up the provider's inner state.
// For example, the provider can prefetch tso when it is advised.
AdviceWarmUp AdviceType = iota
// AdviceOptimizeWithPlan indicates to do some optimizations with the given plan
AdviceOptimizeWithPlan
)

// TxnContextProvider provides txn context
Expand All @@ -120,7 +122,7 @@ type TxnContextProvider interface {
// OnStmtRetry is the hook that should be called when a statement is retried internally.
OnStmtRetry(ctx context.Context) error
// Advise is used to give advice to provider
Advise(tp AdviceType) error
Advise(tp AdviceType, val []any) error
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Try to avoid using any type but a concrete type in the interface definition, it would make it difficult to use and maintain.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's hard to construct a concrete type here as the parameters of different advice type differs a lot. Could you give some suggestions?

Copy link
Contributor

@cfzjywxk cfzjywxk Jun 8, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It depends on the design of what's planned to be passed inside and how the parameters would be used. The most commonly used way is to abstract some interface or trait to describe the meaning of the input parameters, for example, you could abstract a new type named TxnAdvice which contains both advice type and value, or abstract a new interface which defines what should be passed in as the advice parameters.
The main thing is to answer what's an advice and how to describe it, what's the design?, but not just leave something like whatever type is acceptable, perhaps some will work some would not especially defining interfaces.

In general, don't use the any or inteface{} types it's not maintainable and other people have no idea what should be passed or how to build another advice type.

}

// TxnManager is an interface providing txn context management in session
Expand All @@ -147,7 +149,7 @@ type TxnManager interface {
OnStmtRetry(ctx context.Context) error
// Advise is used to give advice to provider.
// For example, `AdviceWarmUp` can tell the provider to warm up its inner state.
Advise(tp AdviceType) error
Advise(tp AdviceType, val ...any) error
}

// NewTxn starts a new optimistic and active txn, it can be used for the below scenes:
Expand Down Expand Up @@ -176,5 +178,10 @@ func WarmUpTxn(sctx sessionctx.Context) error {
return GetTxnManager(sctx).Advise(AdviceWarmUp)
}

// OptimizeWithPlan gives the `AdviceOptimizeWithPlan` advise to the provider
func OptimizeWithPlan(sctx sessionctx.Context, plan any) error {
return GetTxnManager(sctx).Advise(AdviceOptimizeWithPlan, plan)
}

// GetTxnManager returns the TxnManager object from session context
var GetTxnManager func(sctx sessionctx.Context) TxnManager
13 changes: 11 additions & 2 deletions sessiontxn/isolation/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/sessiontxn"
"github.com/pingcap/tidb/sessiontxn/staleread"
"github.com/pingcap/tidb/table/temptable"
)

Expand Down Expand Up @@ -118,7 +119,7 @@ func (p *baseTxnContextProvider) GetStmtForUpdateTS() (uint64, error) {
return p.getStmtForUpdateTSFunc()
}

func (p *baseTxnContextProvider) Advise(tp sessiontxn.AdviceType) error {
func (p *baseTxnContextProvider) Advise(tp sessiontxn.AdviceType, _ []any) error {
switch tp {
case sessiontxn.AdviceWarmUp:
return p.warmUp()
Expand Down Expand Up @@ -197,8 +198,16 @@ func (p *baseTxnContextProvider) isTidbSnapshotEnabled() bool {
return p.sctx.GetSessionVars().SnapshotTS != 0
}

// isBeginStmtWithStaleRead indicates whether the current statement is `BeginStmt` type with stale read
// Because stale read will use `staleread.StalenessTxnContextProvider` for query, so if `staleread.IsStmtStaleness()`
// returns true in other providers, it means the current statement is `BeginStmt` with stale read
func (p *baseTxnContextProvider) isBeginStmtWithStaleRead() bool {
return staleread.IsStmtStaleness(p.sctx)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where's the logic checking isBeiginStmt, the function name is a bit confusing.

}

func (p *baseTxnContextProvider) warmUp() error {
if p.isTidbSnapshotEnabled() {
if p.isBeginStmtWithStaleRead() {
// When executing `START TRANSACTION READ ONLY AS OF ...` no need to warmUp
return nil
}
return p.prepareTxn()
Expand Down
4 changes: 2 additions & 2 deletions sessiontxn/isolation/readcommitted.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,12 +108,12 @@ func (p *PessimisticRCTxnContextProvider) OnStmtRetry(ctx context.Context) error
}

// Advise is used to give advice to provider
func (p *PessimisticRCTxnContextProvider) Advise(tp sessiontxn.AdviceType) error {
func (p *PessimisticRCTxnContextProvider) Advise(tp sessiontxn.AdviceType, val []any) error {
switch tp {
case sessiontxn.AdviceWarmUp:
return p.warmUp()
default:
return p.baseTxnContextProvider.Advise(tp)
return p.baseTxnContextProvider.Advise(tp, val)
}
}

Expand Down
16 changes: 15 additions & 1 deletion sessiontxn/isolation/readcommitted_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (

"github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/executor"
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/kv"
Expand Down Expand Up @@ -274,9 +275,22 @@ func TestRCProviderInitialize(t *testing.T) {
assertAfterActive.Check(t)
require.Equal(t, ts, se.GetSessionVars().TxnCtx.StartTS)
tk.MustExec("rollback")

// Case Pessimistic Autocommit
config.GetGlobalConfig().PessimisticTxn.PessimisticAutoCommit.Store(true)
assert = inActiveRCTxnAssert(se)
assertAfterActive = activeRCTxnAssert(t, se, true)
require.NoError(t, se.PrepareTxnCtx(context.TODO()))
provider = assert.CheckAndGetProvider(t)
require.NoError(t, provider.OnStmtStart(context.TODO()))
ts, err = provider.GetStmtReadTS()
require.NoError(t, err)
assertAfterActive.Check(t)
require.Equal(t, ts, se.GetSessionVars().TxnCtx.StartTS)
tk.MustExec("rollback")
}

func TestTidbSnapshotVar(t *testing.T) {
func TestTidbSnapshotVarInRC(t *testing.T) {
store, dom, clean := testkit.CreateMockStoreAndDomain(t)
defer clean()

Expand Down
Loading