Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

txn: Provide PessimisticRCTxnContextProvider for RC isolation #34702

Merged
merged 47 commits into from
Jun 2, 2022
Merged
Show file tree
Hide file tree
Changes from 43 commits
Commits
Show all changes
47 commits
Select commit Hold shift + click to select a range
2a06641
txn: Provide `PessimisticRCTxnContextProvider` for RC isolation
lcwangchao May 16, 2022
6ef49e2
update
lcwangchao May 16, 2022
d9e4ab4
update
lcwangchao May 16, 2022
2b27055
update
lcwangchao May 16, 2022
7f3cdd4
update
lcwangchao May 16, 2022
2cac30a
update
lcwangchao May 17, 2022
c1d50ce
update
lcwangchao May 17, 2022
33017e8
update
lcwangchao May 17, 2022
cf3ca7c
update
lcwangchao May 17, 2022
9644438
Merge branch 'master' into rc_provider
lcwangchao May 17, 2022
8a22474
update
lcwangchao May 17, 2022
6bcaed7
update
lcwangchao May 17, 2022
38391fc
update
lcwangchao May 17, 2022
de87671
Merge branch 'master' into rc_provider
lcwangchao May 17, 2022
b45839c
update
lcwangchao May 17, 2022
4ae29f8
Merge branch 'master' into rc_provider
lcwangchao May 17, 2022
344012f
update
lcwangchao May 18, 2022
7ba70d0
update
lcwangchao May 18, 2022
9534210
Merge branch 'master' into rc_provider
lcwangchao May 18, 2022
67d3117
update
lcwangchao May 18, 2022
66d508a
update
lcwangchao May 18, 2022
283226e
update
lcwangchao May 18, 2022
438073e
update
lcwangchao May 18, 2022
0e3eab8
update
lcwangchao May 18, 2022
90e8241
Merge branch 'master' into rc_provider
lcwangchao May 18, 2022
76eadb7
Merge branch 'master' into rc_provider
lcwangchao May 19, 2022
c71ff52
update
lcwangchao May 20, 2022
be296b2
add base provider
lcwangchao May 27, 2022
ce409f9
Merge branch 'master' into rc_provider
lcwangchao May 27, 2022
6f5d6ef
update
lcwangchao May 27, 2022
71e8a93
update
lcwangchao May 27, 2022
fd93a26
Merge branch 'master' into rc_provider
lcwangchao May 27, 2022
c68a89a
update
lcwangchao May 27, 2022
754df88
Merge branch 'master' into rc_provider
lcwangchao May 31, 2022
d0c839e
update
lcwangchao May 31, 2022
c8b48e3
update
lcwangchao May 31, 2022
9694ad6
format
lcwangchao May 31, 2022
467962a
Merge branch 'master' into rc_provider
lcwangchao May 31, 2022
bd3ec9b
comments
lcwangchao Jun 1, 2022
6d8954b
Merge branch 'master' into rc_provider
lcwangchao Jun 1, 2022
f8d20d1
add tests
lcwangchao Jun 2, 2022
b9229ef
fix spell
lcwangchao Jun 2, 2022
f1a7861
Merge branch 'master' into rc_provider
lcwangchao Jun 2, 2022
4d9429f
format
lcwangchao Jun 2, 2022
9cf9485
Update sessiontxn/txn.go
lcwangchao Jun 2, 2022
1755975
update
lcwangchao Jun 2, 2022
e999728
Merge branch 'master' into rc_provider
lcwangchao Jun 2, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion executor/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -759,7 +759,6 @@ func UpdateForUpdateTS(seCtx sessionctx.Context, newForUpdateTS uint64) error {
}
seCtx.GetSessionVars().TxnCtx.SetForUpdateTS(newForUpdateTS)
txn.SetOption(kv.SnapshotTS, seCtx.GetSessionVars().TxnCtx.GetForUpdateTS())
seCtx.GetSessionVars().TxnCtx.LastRcReadTs = newForUpdateTS
return nil
}

Expand Down
46 changes: 2 additions & 44 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@ import (
"github.com/pingcap/tidb/util/collate"
"github.com/pingcap/tidb/util/cteutil"
"github.com/pingcap/tidb/util/execdetails"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/mathutil"
"github.com/pingcap/tidb/util/memory"
"github.com/pingcap/tidb/util/ranger"
Expand All @@ -71,7 +70,6 @@ import (
"github.com/pingcap/tidb/util/timeutil"
"github.com/pingcap/tipb/go-tipb"
"github.com/tikv/client-go/v2/tikv"
"go.uber.org/zap"
)

var (
Expand Down Expand Up @@ -1584,11 +1582,6 @@ func (b *executorBuilder) getSnapshotTS() (uint64, error) {
// and some stale/historical read contexts. For example, it will return txn.StartTS in RR and return
// the current timestamp in RC isolation
func (b *executorBuilder) getReadTS() (uint64, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will this getReadTS be refactored in the future so that the ts is returned from some statement provider or context provider? There's also a GetStmtReadTS in the transaction manager.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, after refactor completed, this will be removed

// `refreshForUpdateTSForRC` should always be invoked before returning the cached value to
// ensure the correct value is returned even the `snapshotTS` field is already set by other
// logics. However for `IndexLookUpMergeJoin` and `IndexLookUpHashJoin`, it requires caching the
// snapshotTS and and may even use it after the txn being destroyed. In this case, mark
// `snapshotTSCached` to skip `refreshForUpdateTSForRC`.
failpoint.Inject("assertNotStaleReadForExecutorGetReadTS", func() {
// after refactoring stale read will use its own context provider
staleread.AssertStmtStaleness(b.ctx, false)
Expand All @@ -1604,12 +1597,6 @@ func (b *executorBuilder) getReadTS() (uint64, error) {
return snapshotTS, nil
}

if b.ctx.GetSessionVars().IsPessimisticReadConsistency() {
if err := b.refreshForUpdateTSForRC(); err != nil {
return 0, err
}
}

if b.snapshotTS != 0 {
b.snapshotTSCached = true
// Return the cached value.
Expand Down Expand Up @@ -2229,41 +2216,12 @@ func (b *executorBuilder) updateForUpdateTSIfNeeded(selectPlan plannercore.Physi
// The Repeatable Read transaction use Read Committed level to read data for writing (insert, update, delete, select for update),
// We should always update/refresh the for-update-ts no matter the isolation level is RR or RC.
if b.ctx.GetSessionVars().IsPessimisticReadConsistency() {
return b.refreshForUpdateTSForRC()
_, err = sessiontxn.GetTxnManager(b.ctx).GetStmtForUpdateTS()
return err
}
return UpdateForUpdateTS(b.ctx, 0)
}

// refreshForUpdateTSForRC is used to refresh the for-update-ts for reading data at read consistency level in pessimistic transaction.
// It could use the cached tso from the statement future to avoid get tso many times.
func (b *executorBuilder) refreshForUpdateTSForRC() error {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto, would the updateForUpdateTSIfNeeded be refactored too in the future?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I want to remove it in the future. But we should still keep it when all codes are moved to provider. Because for Insert the GetStmtForUpdateTS should be called to update the snapshot's ts for the later update operations. However after refactor, updateForUpdateTSIfNeeded will be more simple and just callingTxnManager.GetStmtForUpdateTS is enough.

Maybe another way is to add a new method TxnForWrite to TxnManager and the InsertExec should use it instead of sessionctx.Txn

defer func() {
b.snapshotTS = b.ctx.GetSessionVars().TxnCtx.GetForUpdateTS()
}()
// The first time read-consistency read is executed and `RcReadCheckTS` is enabled, try to use
// the last valid ts as the for update read ts.
if b.ctx.GetSessionVars().StmtCtx.RCCheckTS {
rcReadTS := b.ctx.GetSessionVars().TxnCtx.LastRcReadTs
if rcReadTS == 0 {
rcReadTS = b.ctx.GetSessionVars().TxnCtx.StartTS
}
return UpdateForUpdateTS(b.ctx, rcReadTS)
}
future := b.ctx.GetSessionVars().TxnCtx.GetStmtFutureForRC()
if future == nil {
return nil
}
newForUpdateTS, waitErr := future.Wait()
if waitErr != nil {
logutil.BgLogger().Warn("wait tso failed",
zap.Uint64("startTS", b.ctx.GetSessionVars().TxnCtx.StartTS),
zap.Error(waitErr))
}
b.ctx.GetSessionVars().TxnCtx.SetStmtFutureForRC(nil)
// If newForUpdateTS is 0, it will force to get a new for-update-ts from PD.
return UpdateForUpdateTS(b.ctx, newForUpdateTS)
}

func (b *executorBuilder) buildAnalyzeIndexPushdown(task plannercore.AnalyzeIndexTask, opts map[ast.AnalyzeOptionType]uint64, autoAnalyze string) *analyzeTask {
job := &statistics.AnalyzeJob{DBName: task.DBName, TableName: task.TableName, PartitionName: task.PartitionName, JobInfo: autoAnalyze + "analyze index " + task.IndexInfo.Name.O}
_, offset := timeutil.Zone(b.ctx.GetSessionVars().Location())
Expand Down
7 changes: 4 additions & 3 deletions planner/funcdep/extract_fd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/pingcap/tidb/parser"
plannercore "github.com/pingcap/tidb/planner/core"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessiontxn"
"github.com/pingcap/tidb/testkit"
"github.com/pingcap/tidb/util/hint"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -218,7 +219,7 @@ func TestFDSet_ExtractFD(t *testing.T) {
tk.Session().GetSessionVars().PlanColumnID = 0
err = plannercore.Preprocess(tk.Session(), stmt, plannercore.WithPreprocessorReturn(&plannercore.PreprocessorReturn{InfoSchema: is}))
require.NoError(t, err)
tk.Session().PrepareTSFuture(ctx)
require.NoError(t, sessiontxn.WarmUpTxn(tk.Session()))
builder, _ := plannercore.NewPlanBuilder().Init(tk.Session(), is, &hint.BlockHintProcessor{})
// extract FD to every OP
p, err := builder.Build(ctx, stmt)
Expand Down Expand Up @@ -315,7 +316,7 @@ func TestFDSet_ExtractFDForApply(t *testing.T) {
tk.Session().GetSessionVars().PlanColumnID = 0
err = plannercore.Preprocess(tk.Session(), stmt, plannercore.WithPreprocessorReturn(&plannercore.PreprocessorReturn{InfoSchema: is}))
require.NoError(t, err, comment)
tk.Session().PrepareTSFuture(ctx)
require.NoError(t, sessiontxn.WarmUpTxn(tk.Session()))
builder, _ := plannercore.NewPlanBuilder().Init(tk.Session(), is, &hint.BlockHintProcessor{})
// extract FD to every OP
p, err := builder.Build(ctx, stmt)
Expand Down Expand Up @@ -364,7 +365,7 @@ func TestFDSet_MakeOuterJoin(t *testing.T) {
tk.Session().GetSessionVars().PlanColumnID = 0
err = plannercore.Preprocess(tk.Session(), stmt, plannercore.WithPreprocessorReturn(&plannercore.PreprocessorReturn{InfoSchema: is}))
require.NoError(t, err, comment)
tk.Session().PrepareTSFuture(ctx)
require.NoError(t, sessiontxn.WarmUpTxn(tk.Session()))
builder, _ := plannercore.NewPlanBuilder().Init(tk.Session(), is, &hint.BlockHintProcessor{})
// extract FD to every OP
p, err := builder.Build(ctx, stmt)
Expand Down
9 changes: 7 additions & 2 deletions planner/optimize.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/stmtctx"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/sessiontxn"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/hint"
"github.com/pingcap/tidb/util/logutil"
Expand Down Expand Up @@ -123,12 +124,16 @@ func Optimize(ctx context.Context, sctx sessionctx.Context, node ast.Node, is in
}
if fp != nil {
if !useMaxTS(sctx, fp) {
sctx.PrepareTSFuture(ctx)
if err := sessiontxn.WarmUpTxn(sctx); err != nil {
return nil, nil, err
}
}
return fp, fp.OutputNames(), nil
}
}
sctx.PrepareTSFuture(ctx)
if err := sessiontxn.WarmUpTxn(sctx); err != nil {
return nil, nil, err
}

useBinding := sessVars.UsePlanBaselines
stmtNode, ok := node.(ast.StmtNode)
Expand Down
26 changes: 12 additions & 14 deletions session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -2180,7 +2180,9 @@ func (s *session) PrepareStmt(sql string) (stmtID uint32, paramCount int, fields
return
}

s.PrepareTSFuture(ctx)
if err = sessiontxn.WarmUpTxn(s); err != nil {
return
}
prepareExec := executor.NewPrepareExec(s, sql)
err = prepareExec.Next(ctx, nil)
if err != nil {
Expand Down Expand Up @@ -2287,15 +2289,19 @@ func (s *session) cachedPointPlanExec(ctx context.Context,
resultSet, err = stmt.PointGet(ctx, is)
s.txn.changeToInvalid()
case *plannercore.Update:
s.PrepareTSFuture(ctx)
stmtCtx.Priority = kv.PriorityHigh
resultSet, err = runStmt(ctx, s, stmt)
if err = sessiontxn.WarmUpTxn(s); err == nil {
stmtCtx.Priority = kv.PriorityHigh
resultSet, err = runStmt(ctx, s, stmt)
}
case nil:
// cache is invalid
if prepareStmt.ForUpdateRead {
s.PrepareTSFuture(ctx)
err = sessiontxn.WarmUpTxn(s)
}

if err == nil {
resultSet, err = runStmt(ctx, s, stmt)
}
resultSet, err = runStmt(ctx, s, stmt)
default:
prepared.CachedPlan = nil
return nil, false, nil
Expand Down Expand Up @@ -3159,14 +3165,6 @@ func (s *session) PrepareTSFuture(ctx context.Context) {
// Prepare the transaction future if the transaction is invalid (at the beginning of the transaction).
txnFuture := s.getTxnFuture(ctx)
s.txn.changeInvalidToPending(txnFuture)
} else if s.txn.Valid() && s.GetSessionVars().IsPessimisticReadConsistency() {
// Prepare the statement future if the transaction is valid in RC transactions.
// If the `RCCheckTS` is used, try to use the last valid ts to read.
if s.GetSessionVars().StmtCtx.RCCheckTS {
s.GetSessionVars().TxnCtx.SetStmtFutureForRC(nil)
} else {
s.GetSessionVars().TxnCtx.SetStmtFutureForRC(s.getTxnFuture(ctx).future)
}
}
}

Expand Down
19 changes: 4 additions & 15 deletions session/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
"sync/atomic"
"time"

"github.com/opentracing/opentracing-go"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/config"
Expand All @@ -34,6 +33,7 @@ import (
"github.com/pingcap/tidb/session/txninfo"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/binloginfo"
"github.com/pingcap/tidb/sessiontxn"
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/sli"
Expand Down Expand Up @@ -508,20 +508,9 @@ func (tf *txnFuture) wait() (kv.Transaction, error) {
}

func (s *session) getTxnFuture(ctx context.Context) *txnFuture {
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span1 := span.Tracer().StartSpan("session.getTxnFuture", opentracing.ChildOf(span.Context()))
defer span1.Finish()
ctx = opentracing.ContextWithSpan(ctx, span1)
}

oracleStore := s.store.GetOracle()
var tsFuture oracle.Future
if s.sessionVars.LowResolutionTSO {
tsFuture = oracleStore.GetLowResolutionTimestampAsync(ctx, &oracle.Option{TxnScope: s.sessionVars.CheckAndGetTxnScope()})
} else {
tsFuture = oracleStore.GetTimestampAsync(ctx, &oracle.Option{TxnScope: s.sessionVars.CheckAndGetTxnScope()})
}
ret := &txnFuture{future: tsFuture, store: s.store, txnScope: s.sessionVars.CheckAndGetTxnScope()}
scope := s.sessionVars.CheckAndGetTxnScope()
future := sessiontxn.NewOracleFuture(ctx, s, scope)
ret := &txnFuture{future: future, store: s.store, txnScope: scope}
failpoint.InjectContext(ctx, "mockGetTSFail", func() {
ret.future = txnFailFuture{}
})
Expand Down
15 changes: 15 additions & 0 deletions session/txnmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/pingcap/tidb/parser/ast"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessiontxn"
"github.com/pingcap/tidb/sessiontxn/isolation"
"github.com/pingcap/tidb/sessiontxn/legacy"
"github.com/pingcap/tidb/sessiontxn/staleread"
)
Expand Down Expand Up @@ -113,6 +114,13 @@ func (m *txnManager) OnStmtRetry(ctx context.Context) error {
return m.ctxProvider.OnStmtRetry(ctx)
}

func (m *txnManager) Advise(tp sessiontxn.AdviceType) error {
if m.ctxProvider != nil {
return m.ctxProvider.Advise(tp)
}
return nil
}

func (m *txnManager) newProviderWithRequest(r *sessiontxn.EnterNewTxnRequest) sessiontxn.TxnContextProvider {
if r.Provider != nil {
return r.Provider
Expand All @@ -127,6 +135,13 @@ func (m *txnManager) newProviderWithRequest(r *sessiontxn.EnterNewTxnRequest) se
return staleread.NewStalenessTxnContextProvider(m.sctx, r.StaleReadTS, nil)
}

if txnMode == ast.Pessimistic {
switch m.sctx.GetSessionVars().IsolationLevelForNewTxn() {
case ast.ReadCommitted:
return isolation.NewPessimisticRCTxnContextProvider(m.sctx, r.CausalConsistencyOnly)
}
}

return &legacy.SimpleTxnContextProvider{
Sctx: m.sctx,
Pessimistic: txnMode == ast.Pessimistic,
Expand Down
43 changes: 19 additions & 24 deletions sessionctx/variable/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ import (
"github.com/pingcap/tidb/util/tableutil"
"github.com/pingcap/tidb/util/timeutil"
tikvstore "github.com/tikv/client-go/v2/kv"
"github.com/tikv/client-go/v2/oracle"
"github.com/tikv/client-go/v2/tikv"
"github.com/twmb/murmur3"
atomic2 "go.uber.org/atomic"
Expand Down Expand Up @@ -156,7 +155,6 @@ type TxnCtxNeedToRestore struct {
// TxnCtxNoNeedToRestore stores transaction variables which do not need to restored when rolling back to a savepoint.
type TxnCtxNoNeedToRestore struct {
forUpdateTS uint64
stmtFuture oracle.Future
Binlog interface{}
InfoSchema interface{}
History interface{}
Expand Down Expand Up @@ -199,9 +197,6 @@ type TxnCtxNoNeedToRestore struct {
// TemporaryTables is used to store transaction-specific information for global temporary tables.
// It can also be stored in sessionCtx with local temporary tables, but it's easier to clean this data after transaction ends.
TemporaryTables map[int64]tableutil.TempTable

// Last ts used by read-consistency read.
LastRcReadTs uint64
}

// SavepointRecord indicates a transaction's savepoint record.
Expand Down Expand Up @@ -333,16 +328,6 @@ func (tc *TransactionContext) SetForUpdateTS(forUpdateTS uint64) {
}
}

// SetStmtFutureForRC sets the stmtFuture .
func (tc *TransactionContext) SetStmtFutureForRC(future oracle.Future) {
tc.stmtFuture = future
}

// GetStmtFutureForRC gets the stmtFuture.
func (tc *TransactionContext) GetStmtFutureForRC() oracle.Future {
return tc.stmtFuture
}

// GetCurrentSavepoint gets TransactionContext's savepoint.
func (tc *TransactionContext) GetCurrentSavepoint() TxnCtxNeedToRestore {
tableDeltaMap := make(map[int64]TableDelta, len(tc.TableDeltaMap))
Expand Down Expand Up @@ -1630,6 +1615,25 @@ func (s *SessionVars) IsIsolation(isolation string) bool {
return s.TxnCtx.Isolation == isolation
}

// IsolationLevelForNewTxn returns the isolation level if we want to enter a new transaction
func (s *SessionVars) IsolationLevelForNewTxn() (isolation string) {
if s.InTxn() {
if s.txnIsolationLevelOneShot.state == oneShotSet {
isolation = s.txnIsolationLevelOneShot.value
}
} else {
if s.txnIsolationLevelOneShot.state == oneShotUse {
isolation = s.txnIsolationLevelOneShot.value
}
}

if isolation == "" {
isolation, _ = s.GetSystemVar(TxnIsolation)
}

return
}

// SetTxnIsolationLevelOneShotStateForNextTxn sets the txnIsolationLevelOneShot.state for next transaction.
func (s *SessionVars) SetTxnIsolationLevelOneShotStateForNextTxn() {
if isoLevelOneShot := &s.txnIsolationLevelOneShot; isoLevelOneShot.state != oneShotDef {
Expand Down Expand Up @@ -2504,12 +2508,3 @@ func (s *SessionVars) GetSeekFactor(tbl *model.TableInfo) float64 {
}
return s.seekFactor
}

// IsRcCheckTsRetryable checks if the current error is retryable for `RcReadCheckTS` path.
func (s *SessionVars) IsRcCheckTsRetryable(err error) bool {
if err == nil {
return false
}
// The `RCCheckTS` flag of `stmtCtx` is set.
return s.RcReadCheckTS && s.StmtCtx.RCCheckTS && errors.ErrorEqual(err, kv.ErrWriteConflict)
}
Loading