Skip to content

Commit

Permalink
txn: Provide PessimisticRCTxnContextProvider for RC isolation (#34702)
Browse files Browse the repository at this point in the history
close #34746
  • Loading branch information
lcwangchao authored Jun 2, 2022
1 parent 7342460 commit 22e9f4d
Show file tree
Hide file tree
Showing 19 changed files with 1,085 additions and 140 deletions.
1 change: 0 additions & 1 deletion executor/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -759,7 +759,6 @@ func UpdateForUpdateTS(seCtx sessionctx.Context, newForUpdateTS uint64) error {
}
seCtx.GetSessionVars().TxnCtx.SetForUpdateTS(newForUpdateTS)
txn.SetOption(kv.SnapshotTS, seCtx.GetSessionVars().TxnCtx.GetForUpdateTS())
seCtx.GetSessionVars().TxnCtx.LastRcReadTs = newForUpdateTS
return nil
}

Expand Down
46 changes: 2 additions & 44 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@ import (
"github.com/pingcap/tidb/util/collate"
"github.com/pingcap/tidb/util/cteutil"
"github.com/pingcap/tidb/util/execdetails"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/mathutil"
"github.com/pingcap/tidb/util/memory"
"github.com/pingcap/tidb/util/ranger"
Expand All @@ -71,7 +70,6 @@ import (
"github.com/pingcap/tidb/util/timeutil"
"github.com/pingcap/tipb/go-tipb"
"github.com/tikv/client-go/v2/tikv"
"go.uber.org/zap"
)

var (
Expand Down Expand Up @@ -1584,11 +1582,6 @@ func (b *executorBuilder) getSnapshotTS() (uint64, error) {
// and some stale/historical read contexts. For example, it will return txn.StartTS in RR and return
// the current timestamp in RC isolation
func (b *executorBuilder) getReadTS() (uint64, error) {
// `refreshForUpdateTSForRC` should always be invoked before returning the cached value to
// ensure the correct value is returned even the `snapshotTS` field is already set by other
// logics. However for `IndexLookUpMergeJoin` and `IndexLookUpHashJoin`, it requires caching the
// snapshotTS and and may even use it after the txn being destroyed. In this case, mark
// `snapshotTSCached` to skip `refreshForUpdateTSForRC`.
failpoint.Inject("assertNotStaleReadForExecutorGetReadTS", func() {
// after refactoring stale read will use its own context provider
staleread.AssertStmtStaleness(b.ctx, false)
Expand All @@ -1604,12 +1597,6 @@ func (b *executorBuilder) getReadTS() (uint64, error) {
return snapshotTS, nil
}

if b.ctx.GetSessionVars().IsPessimisticReadConsistency() {
if err := b.refreshForUpdateTSForRC(); err != nil {
return 0, err
}
}

if b.snapshotTS != 0 {
b.snapshotTSCached = true
// Return the cached value.
Expand Down Expand Up @@ -2229,41 +2216,12 @@ func (b *executorBuilder) updateForUpdateTSIfNeeded(selectPlan plannercore.Physi
// The Repeatable Read transaction use Read Committed level to read data for writing (insert, update, delete, select for update),
// We should always update/refresh the for-update-ts no matter the isolation level is RR or RC.
if b.ctx.GetSessionVars().IsPessimisticReadConsistency() {
return b.refreshForUpdateTSForRC()
_, err = sessiontxn.GetTxnManager(b.ctx).GetStmtForUpdateTS()
return err
}
return UpdateForUpdateTS(b.ctx, 0)
}

// refreshForUpdateTSForRC is used to refresh the for-update-ts for reading data at read consistency level in pessimistic transaction.
// It could use the cached tso from the statement future to avoid get tso many times.
func (b *executorBuilder) refreshForUpdateTSForRC() error {
defer func() {
b.snapshotTS = b.ctx.GetSessionVars().TxnCtx.GetForUpdateTS()
}()
// The first time read-consistency read is executed and `RcReadCheckTS` is enabled, try to use
// the last valid ts as the for update read ts.
if b.ctx.GetSessionVars().StmtCtx.RCCheckTS {
rcReadTS := b.ctx.GetSessionVars().TxnCtx.LastRcReadTs
if rcReadTS == 0 {
rcReadTS = b.ctx.GetSessionVars().TxnCtx.StartTS
}
return UpdateForUpdateTS(b.ctx, rcReadTS)
}
future := b.ctx.GetSessionVars().TxnCtx.GetStmtFutureForRC()
if future == nil {
return nil
}
newForUpdateTS, waitErr := future.Wait()
if waitErr != nil {
logutil.BgLogger().Warn("wait tso failed",
zap.Uint64("startTS", b.ctx.GetSessionVars().TxnCtx.StartTS),
zap.Error(waitErr))
}
b.ctx.GetSessionVars().TxnCtx.SetStmtFutureForRC(nil)
// If newForUpdateTS is 0, it will force to get a new for-update-ts from PD.
return UpdateForUpdateTS(b.ctx, newForUpdateTS)
}

func (b *executorBuilder) buildAnalyzeIndexPushdown(task plannercore.AnalyzeIndexTask, opts map[ast.AnalyzeOptionType]uint64, autoAnalyze string) *analyzeTask {
job := &statistics.AnalyzeJob{DBName: task.DBName, TableName: task.TableName, PartitionName: task.PartitionName, JobInfo: autoAnalyze + "analyze index " + task.IndexInfo.Name.O}
_, offset := timeutil.Zone(b.ctx.GetSessionVars().Location())
Expand Down
7 changes: 4 additions & 3 deletions planner/funcdep/extract_fd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/pingcap/tidb/parser"
plannercore "github.com/pingcap/tidb/planner/core"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessiontxn"
"github.com/pingcap/tidb/testkit"
"github.com/pingcap/tidb/util/hint"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -218,7 +219,7 @@ func TestFDSet_ExtractFD(t *testing.T) {
tk.Session().GetSessionVars().PlanColumnID = 0
err = plannercore.Preprocess(tk.Session(), stmt, plannercore.WithPreprocessorReturn(&plannercore.PreprocessorReturn{InfoSchema: is}))
require.NoError(t, err)
tk.Session().PrepareTSFuture(ctx)
require.NoError(t, sessiontxn.WarmUpTxn(tk.Session()))
builder, _ := plannercore.NewPlanBuilder().Init(tk.Session(), is, &hint.BlockHintProcessor{})
// extract FD to every OP
p, err := builder.Build(ctx, stmt)
Expand Down Expand Up @@ -315,7 +316,7 @@ func TestFDSet_ExtractFDForApply(t *testing.T) {
tk.Session().GetSessionVars().PlanColumnID = 0
err = plannercore.Preprocess(tk.Session(), stmt, plannercore.WithPreprocessorReturn(&plannercore.PreprocessorReturn{InfoSchema: is}))
require.NoError(t, err, comment)
tk.Session().PrepareTSFuture(ctx)
require.NoError(t, sessiontxn.WarmUpTxn(tk.Session()))
builder, _ := plannercore.NewPlanBuilder().Init(tk.Session(), is, &hint.BlockHintProcessor{})
// extract FD to every OP
p, err := builder.Build(ctx, stmt)
Expand Down Expand Up @@ -364,7 +365,7 @@ func TestFDSet_MakeOuterJoin(t *testing.T) {
tk.Session().GetSessionVars().PlanColumnID = 0
err = plannercore.Preprocess(tk.Session(), stmt, plannercore.WithPreprocessorReturn(&plannercore.PreprocessorReturn{InfoSchema: is}))
require.NoError(t, err, comment)
tk.Session().PrepareTSFuture(ctx)
require.NoError(t, sessiontxn.WarmUpTxn(tk.Session()))
builder, _ := plannercore.NewPlanBuilder().Init(tk.Session(), is, &hint.BlockHintProcessor{})
// extract FD to every OP
p, err := builder.Build(ctx, stmt)
Expand Down
9 changes: 7 additions & 2 deletions planner/optimize.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/stmtctx"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/sessiontxn"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/hint"
"github.com/pingcap/tidb/util/logutil"
Expand Down Expand Up @@ -123,12 +124,16 @@ func Optimize(ctx context.Context, sctx sessionctx.Context, node ast.Node, is in
}
if fp != nil {
if !useMaxTS(sctx, fp) {
sctx.PrepareTSFuture(ctx)
if err := sessiontxn.WarmUpTxn(sctx); err != nil {
return nil, nil, err
}
}
return fp, fp.OutputNames(), nil
}
}
sctx.PrepareTSFuture(ctx)
if err := sessiontxn.WarmUpTxn(sctx); err != nil {
return nil, nil, err
}

useBinding := sessVars.UsePlanBaselines
stmtNode, ok := node.(ast.StmtNode)
Expand Down
26 changes: 12 additions & 14 deletions session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -2180,7 +2180,9 @@ func (s *session) PrepareStmt(sql string) (stmtID uint32, paramCount int, fields
return
}

s.PrepareTSFuture(ctx)
if err = sessiontxn.WarmUpTxn(s); err != nil {
return
}
prepareExec := executor.NewPrepareExec(s, sql)
err = prepareExec.Next(ctx, nil)
if err != nil {
Expand Down Expand Up @@ -2287,15 +2289,19 @@ func (s *session) cachedPointPlanExec(ctx context.Context,
resultSet, err = stmt.PointGet(ctx, is)
s.txn.changeToInvalid()
case *plannercore.Update:
s.PrepareTSFuture(ctx)
stmtCtx.Priority = kv.PriorityHigh
resultSet, err = runStmt(ctx, s, stmt)
if err = sessiontxn.WarmUpTxn(s); err == nil {
stmtCtx.Priority = kv.PriorityHigh
resultSet, err = runStmt(ctx, s, stmt)
}
case nil:
// cache is invalid
if prepareStmt.ForUpdateRead {
s.PrepareTSFuture(ctx)
err = sessiontxn.WarmUpTxn(s)
}

if err == nil {
resultSet, err = runStmt(ctx, s, stmt)
}
resultSet, err = runStmt(ctx, s, stmt)
default:
prepared.CachedPlan = nil
return nil, false, nil
Expand Down Expand Up @@ -3159,14 +3165,6 @@ func (s *session) PrepareTSFuture(ctx context.Context) {
// Prepare the transaction future if the transaction is invalid (at the beginning of the transaction).
txnFuture := s.getTxnFuture(ctx)
s.txn.changeInvalidToPending(txnFuture)
} else if s.txn.Valid() && s.GetSessionVars().IsPessimisticReadConsistency() {
// Prepare the statement future if the transaction is valid in RC transactions.
// If the `RCCheckTS` is used, try to use the last valid ts to read.
if s.GetSessionVars().StmtCtx.RCCheckTS {
s.GetSessionVars().TxnCtx.SetStmtFutureForRC(nil)
} else {
s.GetSessionVars().TxnCtx.SetStmtFutureForRC(s.getTxnFuture(ctx).future)
}
}
}

Expand Down
19 changes: 4 additions & 15 deletions session/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
"sync/atomic"
"time"

"github.com/opentracing/opentracing-go"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/config"
Expand All @@ -34,6 +33,7 @@ import (
"github.com/pingcap/tidb/session/txninfo"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/binloginfo"
"github.com/pingcap/tidb/sessiontxn"
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/sli"
Expand Down Expand Up @@ -508,20 +508,9 @@ func (tf *txnFuture) wait() (kv.Transaction, error) {
}

func (s *session) getTxnFuture(ctx context.Context) *txnFuture {
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span1 := span.Tracer().StartSpan("session.getTxnFuture", opentracing.ChildOf(span.Context()))
defer span1.Finish()
ctx = opentracing.ContextWithSpan(ctx, span1)
}

oracleStore := s.store.GetOracle()
var tsFuture oracle.Future
if s.sessionVars.LowResolutionTSO {
tsFuture = oracleStore.GetLowResolutionTimestampAsync(ctx, &oracle.Option{TxnScope: s.sessionVars.CheckAndGetTxnScope()})
} else {
tsFuture = oracleStore.GetTimestampAsync(ctx, &oracle.Option{TxnScope: s.sessionVars.CheckAndGetTxnScope()})
}
ret := &txnFuture{future: tsFuture, store: s.store, txnScope: s.sessionVars.CheckAndGetTxnScope()}
scope := s.sessionVars.CheckAndGetTxnScope()
future := sessiontxn.NewOracleFuture(ctx, s, scope)
ret := &txnFuture{future: future, store: s.store, txnScope: scope}
failpoint.InjectContext(ctx, "mockGetTSFail", func() {
ret.future = txnFailFuture{}
})
Expand Down
15 changes: 15 additions & 0 deletions session/txnmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/pingcap/tidb/parser/ast"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessiontxn"
"github.com/pingcap/tidb/sessiontxn/isolation"
"github.com/pingcap/tidb/sessiontxn/legacy"
"github.com/pingcap/tidb/sessiontxn/staleread"
)
Expand Down Expand Up @@ -113,6 +114,13 @@ func (m *txnManager) OnStmtRetry(ctx context.Context) error {
return m.ctxProvider.OnStmtRetry(ctx)
}

func (m *txnManager) Advise(tp sessiontxn.AdviceType) error {
if m.ctxProvider != nil {
return m.ctxProvider.Advise(tp)
}
return nil
}

func (m *txnManager) newProviderWithRequest(r *sessiontxn.EnterNewTxnRequest) sessiontxn.TxnContextProvider {
if r.Provider != nil {
return r.Provider
Expand All @@ -127,6 +135,13 @@ func (m *txnManager) newProviderWithRequest(r *sessiontxn.EnterNewTxnRequest) se
return staleread.NewStalenessTxnContextProvider(m.sctx, r.StaleReadTS, nil)
}

if txnMode == ast.Pessimistic {
switch m.sctx.GetSessionVars().IsolationLevelForNewTxn() {
case ast.ReadCommitted:
return isolation.NewPessimisticRCTxnContextProvider(m.sctx, r.CausalConsistencyOnly)
}
}

return &legacy.SimpleTxnContextProvider{
Sctx: m.sctx,
Pessimistic: txnMode == ast.Pessimistic,
Expand Down
43 changes: 19 additions & 24 deletions sessionctx/variable/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ import (
"github.com/pingcap/tidb/util/tableutil"
"github.com/pingcap/tidb/util/timeutil"
tikvstore "github.com/tikv/client-go/v2/kv"
"github.com/tikv/client-go/v2/oracle"
"github.com/tikv/client-go/v2/tikv"
"github.com/twmb/murmur3"
atomic2 "go.uber.org/atomic"
Expand Down Expand Up @@ -156,7 +155,6 @@ type TxnCtxNeedToRestore struct {
// TxnCtxNoNeedToRestore stores transaction variables which do not need to restored when rolling back to a savepoint.
type TxnCtxNoNeedToRestore struct {
forUpdateTS uint64
stmtFuture oracle.Future
Binlog interface{}
InfoSchema interface{}
History interface{}
Expand Down Expand Up @@ -199,9 +197,6 @@ type TxnCtxNoNeedToRestore struct {
// TemporaryTables is used to store transaction-specific information for global temporary tables.
// It can also be stored in sessionCtx with local temporary tables, but it's easier to clean this data after transaction ends.
TemporaryTables map[int64]tableutil.TempTable

// Last ts used by read-consistency read.
LastRcReadTs uint64
}

// SavepointRecord indicates a transaction's savepoint record.
Expand Down Expand Up @@ -333,16 +328,6 @@ func (tc *TransactionContext) SetForUpdateTS(forUpdateTS uint64) {
}
}

// SetStmtFutureForRC sets the stmtFuture .
func (tc *TransactionContext) SetStmtFutureForRC(future oracle.Future) {
tc.stmtFuture = future
}

// GetStmtFutureForRC gets the stmtFuture.
func (tc *TransactionContext) GetStmtFutureForRC() oracle.Future {
return tc.stmtFuture
}

// GetCurrentSavepoint gets TransactionContext's savepoint.
func (tc *TransactionContext) GetCurrentSavepoint() TxnCtxNeedToRestore {
tableDeltaMap := make(map[int64]TableDelta, len(tc.TableDeltaMap))
Expand Down Expand Up @@ -1630,6 +1615,25 @@ func (s *SessionVars) IsIsolation(isolation string) bool {
return s.TxnCtx.Isolation == isolation
}

// IsolationLevelForNewTxn returns the isolation level if we want to enter a new transaction
func (s *SessionVars) IsolationLevelForNewTxn() (isolation string) {
if s.InTxn() {
if s.txnIsolationLevelOneShot.state == oneShotSet {
isolation = s.txnIsolationLevelOneShot.value
}
} else {
if s.txnIsolationLevelOneShot.state == oneShotUse {
isolation = s.txnIsolationLevelOneShot.value
}
}

if isolation == "" {
isolation, _ = s.GetSystemVar(TxnIsolation)
}

return
}

// SetTxnIsolationLevelOneShotStateForNextTxn sets the txnIsolationLevelOneShot.state for next transaction.
func (s *SessionVars) SetTxnIsolationLevelOneShotStateForNextTxn() {
if isoLevelOneShot := &s.txnIsolationLevelOneShot; isoLevelOneShot.state != oneShotDef {
Expand Down Expand Up @@ -2504,12 +2508,3 @@ func (s *SessionVars) GetSeekFactor(tbl *model.TableInfo) float64 {
}
return s.seekFactor
}

// IsRcCheckTsRetryable checks if the current error is retryable for `RcReadCheckTS` path.
func (s *SessionVars) IsRcCheckTsRetryable(err error) bool {
if err == nil {
return false
}
// The `RCCheckTS` flag of `stmtCtx` is set.
return s.RcReadCheckTS && s.StmtCtx.RCCheckTS && errors.ErrorEqual(err, kv.ErrWriteConflict)
}
Loading

0 comments on commit 22e9f4d

Please sign in to comment.