Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

txn: unify the management of transaction activation by TxnManager. #35679

Merged
merged 27 commits into from
Jun 30, 2022
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions domain/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,10 @@ func canSkipSchemaCheckerDDL(tp model.ActionType) bool {

// InfoSchema gets the latest information schema from domain.
func (do *Domain) InfoSchema() infoschema.InfoSchema {
if do.infoCache == nil {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If mockContext can return a valid infoschema, can we revert this line?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is still needed as some tests does not init domain propoerly.

// Return nil is for test purpose where domain is not well initialized for session context
return nil
}
return do.infoCache.GetLatest()
}

Expand Down
5 changes: 5 additions & 0 deletions infoschema/infoschema.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,10 @@ import (
"github.com/pingcap/tidb/meta/autoid"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/mock"
)

// InfoSchema is the interface used to retrieve the schema information.
Expand Down Expand Up @@ -353,6 +355,9 @@ func init() {
util.GetSequenceByName = func(is interface{}, schema, sequence model.CIStr) (util.SequenceTable, error) {
return GetSequenceByName(is.(InfoSchema), schema, sequence)
}
mock.MockInfoschema = func(tbList []*model.TableInfo) sessionctx.InfoschemaMetaVersion {
return MockInfoSchema(tbList)
}
}

// HasAutoIncrementColumn checks whether the table has auto_increment columns, if so, return true and the column name.
Expand Down
8 changes: 4 additions & 4 deletions kv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,16 +227,16 @@ type Transaction interface {
// If a key doesn't exist, there shouldn't be any corresponding entry in the result map.
BatchGet(ctx context.Context, keys []Key) (map[string][]byte, error)
IsPessimistic() bool
// CacheIndexName caches the index name.
// CacheTableInfo caches the index name.
// PresumeKeyNotExists will use this to help decode error message.
CacheTableInfo(id int64, info *model.TableInfo)
// GetIndexName returns the cached index name.
// GetTableInfo returns the cached index name.
// If there is no such index already inserted through CacheIndexName, it will return UNKNOWN.
GetTableInfo(id int64) *model.TableInfo

// set allowed options of current operation in each TiKV disk usage level.
// SetDiskFullOpt set allowed options of current operation in each TiKV disk usage level.
SetDiskFullOpt(level kvrpcpb.DiskFullOpt)
// clear allowed flag
// ClearDiskFullOpt clear allowed flag
ClearDiskFullOpt()

// GetMemDBCheckpoint gets the transaction's memDB checkpoint.
Expand Down
10 changes: 7 additions & 3 deletions session/schema_amender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,18 @@ package session
import (
"bytes"
"context"
"fmt"
"strconv"
"testing"

"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tidb/planner/core"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/sessiontxn"
"github.com/pingcap/tidb/store/mockstore"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/tablecodec"
Expand Down Expand Up @@ -254,6 +257,8 @@ func TestAmendCollectAndGenMutations(t *testing.T) {
store: store,
sessionVars: variable.NewSessionVars(),
}
se.mu.values = make(map[fmt.Stringer]interface{})
domain.BindDomain(se, &domain.Domain{})
startStates := []model.SchemaState{model.StateNone, model.StateDeleteOnly, model.StateWriteOnly, model.StateWriteReorganization}
for _, startState := range startStates {
endStatMap := ConstOpAddIndex[startState]
Expand Down Expand Up @@ -404,10 +409,9 @@ func TestAmendCollectAndGenMutations(t *testing.T) {

logutil.BgLogger().Info("[TEST]finish to write old txn data")
// Write data for this new transaction, its memory buffer will be used by schema amender.
txn, err := se.store.Begin()
err = sessiontxn.NewTxn(ctx, se)
require.NoError(t, err)
se.txn.changeInvalidToValid(txn)
txn, err = se.Txn(true)
txn, err := se.Txn(false)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why Txn(true) => Txn(false)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The txn now is already actiavated in sessiontxn.NewTxn(ctx, se), so whether true or false here has no difference.

require.NoError(t, err)
var checkKeys []kv.Key
for i, key := range mutations.GetKeys() {
Expand Down
89 changes: 6 additions & 83 deletions session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -2469,85 +2469,8 @@ func (s *session) Txn(active bool) (kv.Transaction, error) {
if !active {
return &s.txn, nil
}
if !s.txn.validOrPending() {
return &s.txn, errors.AddStack(kv.ErrInvalidTxn)
}
if s.txn.pending() {
defer func(begin time.Time) {
s.sessionVars.DurationWaitTS = time.Since(begin)
}(time.Now())
// Transaction is lazy initialized.
// PrepareTxnCtx is called to get a tso future, makes s.txn a pending txn,
// If Txn() is called later, wait for the future to get a valid txn.
if err := s.txn.changePendingToValid(s.currentCtx); err != nil {
logutil.BgLogger().Error("active transaction fail",
zap.Error(err))
s.txn.cleanup()
s.sessionVars.TxnCtx.StartTS = 0
return &s.txn, err
}
s.sessionVars.TxnCtx.StartTS = s.txn.StartTS()
if s.sessionVars.TxnCtx.IsPessimistic {
s.txn.SetOption(kv.Pessimistic, true)
}
if !s.sessionVars.IsAutocommit() && s.sessionVars.SnapshotTS == 0 {
s.sessionVars.SetInTxn(true)
}
s.sessionVars.TxnCtx.CouldRetry = s.isTxnRetryable()
s.txn.SetVars(s.sessionVars.KVVars)
readReplicaType := s.sessionVars.GetReplicaRead()
if readReplicaType.IsFollowerRead() {
s.txn.SetOption(kv.ReplicaRead, readReplicaType)
}
s.txn.SetOption(kv.SnapInterceptor, s.getSnapshotInterceptor())
if s.GetSessionVars().StmtCtx.WeakConsistency {
s.txn.SetOption(kv.IsolationLevel, kv.RC)
}
setTxnAssertionLevel(&s.txn, s.sessionVars.AssertionLevel)
}
return &s.txn, nil
}

// isTxnRetryable (if returns true) means the transaction could retry.
// If the transaction is in pessimistic mode, do not retry.
// If the session is already in transaction, enable retry or internal SQL could retry.
// If not, the transaction could always retry, because it should be auto committed transaction.
// Anyway the retry limit is 0, the transaction could not retry.
func (s *session) isTxnRetryable() bool {
sessVars := s.sessionVars

// The pessimistic transaction no need to retry.
if sessVars.TxnCtx.IsPessimistic {
return false
}

// If retry limit is 0, the transaction could not retry.
if sessVars.RetryLimit == 0 {
return false
}

// When `@@tidb_snapshot` is set, it is a ready-only statement and will not cause the errors that should retry a transaction in optimistic mode.
if sessVars.SnapshotTS != 0 {
return false
}

// If the session is not InTxn, it is an auto-committed transaction.
// The auto-committed transaction could always retry.
if !sessVars.InTxn() {
return true
}

// The internal transaction could always retry.
if sessVars.InRestrictedSQL {
return true
}

// If the retry is enabled, the transaction could retry.
if !sessVars.DisableTxnAutoRetry {
return true
}

return false
_, err := sessiontxn.GetTxnManager(s).ActivateTxn()
return &s.txn, err
}

func (s *session) NewTxn(ctx context.Context) error {
Expand Down Expand Up @@ -3204,11 +3127,11 @@ func (s *session) PrepareTSFuture(ctx context.Context, future oracle.Future, sco
return nil
}

func (s *session) GetPreparedTSFuture() oracle.Future {
if future := s.txn.txnFuture; future != nil {
return future.future
func (s *session) GetPreparedTxnFuture() sessionctx.TxnFuture {
SpadeA-Tang marked this conversation as resolved.
Show resolved Hide resolved
if !s.txn.validOrPending() {
return nil
}
return nil
return &s.txn
}

// RefreshTxnCtx implements context.RefreshTxnCtx interface.
Expand Down
24 changes: 24 additions & 0 deletions session/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -441,6 +441,30 @@ func (txn *LazyTxn) KeysNeedToLock() ([]kv.Key, error) {
return keys, nil
}

// Wait converts pending txn to valid
func (txn *LazyTxn) Wait(ctx context.Context, sctx sessionctx.Context) (kv.Transaction, error) {
if !txn.validOrPending() {
return txn, errors.AddStack(kv.ErrInvalidTxn)
}
if txn.pending() {
defer func(begin time.Time) {
sctx.GetSessionVars().DurationWaitTS = time.Since(begin)
}(time.Now())

// Transaction is lazy initialized.
// PrepareTxnCtx is called to get a tso future, makes s.txn a pending txn,
// If Txn() is called later, wait for the future to get a valid txn.
if err := txn.changePendingToValid(ctx); err != nil {
logutil.BgLogger().Error("active transaction fail",
zap.Error(err))
txn.cleanup()
sctx.GetSessionVars().TxnCtx.StartTS = 0
return txn, err
}
}
return txn, nil
lcwangchao marked this conversation as resolved.
Show resolved Hide resolved
}

func keyNeedToLock(k, v []byte, flags kv.KeyFlags) bool {
isTableKey := bytes.HasPrefix(k, tablecodec.TablePrefix())
if !isTableKey {
Expand Down
9 changes: 9 additions & 0 deletions session/txnmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/parser/ast"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessiontxn"
Expand Down Expand Up @@ -134,6 +135,14 @@ func (m *txnManager) OnStmtErrorForNextAction(point sessiontxn.StmtErrorHandlePo
return m.ctxProvider.OnStmtErrorForNextAction(point, err)
}

// ActivateTxn decides to activate txn according to the parameter `active`
func (m *txnManager) ActivateTxn() (kv.Transaction, error) {
if m.ctxProvider == nil {
return nil, errors.AddStack(kv.ErrInvalidTxn)
}
return m.ctxProvider.ActivateTxn(nil)
SpadeA-Tang marked this conversation as resolved.
Show resolved Hide resolved
}

// OnStmtRetry is the hook that should be called when a statement retry
func (m *txnManager) OnStmtRetry(ctx context.Context) error {
if m.ctxProvider == nil {
Expand Down
10 changes: 8 additions & 2 deletions sessionctx/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,8 +152,8 @@ type Context interface {
HasLockedTables() bool
// PrepareTSFuture uses to prepare timestamp by future.
PrepareTSFuture(ctx context.Context, future oracle.Future, scope string) error
// GetPreparedTSFuture returns the prepared ts future
GetPreparedTSFuture() oracle.Future
// GetPreparedTxnFuture returns the prepared ts future
GetPreparedTxnFuture() TxnFuture
// StoreIndexUsage stores the index usage information.
StoreIndexUsage(tblID int64, idxID int64, rowsSelected int64)
// GetTxnWriteThroughputSLI returns the TxnWriteThroughputSLI.
Expand All @@ -176,6 +176,12 @@ type Context interface {
ReleaseAllAdvisoryLocks() int
}

// TxnFuture todo: add comments
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can add comments now...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

... Done.

type TxnFuture interface {
// Wait converts pending txn to valid
Wait(ctx context.Context, sctx Context) (kv.Transaction, error)
}

type basicCtxType int

func (t basicCtxType) String() string {
Expand Down
5 changes: 5 additions & 0 deletions sessiontxn/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"

"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/parser/ast"
"github.com/pingcap/tidb/sessionctx"
)
Expand Down Expand Up @@ -129,6 +130,8 @@ type TxnContextProvider interface {
OnStmtErrorForNextAction(point StmtErrorHandlePoint, err error) (StmtErrorAction, error)
// OnStmtRetry is the hook that should be called when a statement is retried internally.
OnStmtRetry(ctx context.Context) error
// ActivateTxn activates the transaction.
ActivateTxn(tp *EnterNewTxnType) (kv.Transaction, error)
}

// TxnManager is an interface providing txn context management in session
Expand Down Expand Up @@ -158,6 +161,8 @@ type TxnManager interface {
OnStmtErrorForNextAction(point StmtErrorHandlePoint, err error) (StmtErrorAction, error)
// OnStmtRetry is the hook that should be called when a statement retry
OnStmtRetry(ctx context.Context) error
// ActivateTxn activates the transaction.
ActivateTxn() (kv.Transaction, error)
}

// NewTxn starts a new optimistic and active txn, it can be used for the below scenes:
Expand Down
Loading