Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

txn: txnManager providers GetReadSnapshot and GetForUpdateSnapshot. #35788

Merged
merged 20 commits into from
Jul 1, 2022
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion executor/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ func (a *ExecStmt) PointGet(ctx context.Context, is infoschema.InfoSchema) (*rec
} else {
// CachedPlan type is already checked in last step
pointGetPlan := a.PsStmt.PreparedAst.CachedPlan.(*plannercore.PointGetPlan)
exec.Init(pointGetPlan, startTs)
exec.Init(pointGetPlan, startTs, nil)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why pass a nil snapshot?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It does not have snapshot originally here.

a.PsStmt.Executor = exec
}
}
Expand Down
42 changes: 13 additions & 29 deletions executor/batch_point_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,9 +78,8 @@ type BatchPointGetExec struct {
// virtualColumnRetFieldTypes records the RetFieldTypes of virtual columns.
virtualColumnRetFieldTypes []*types.FieldType

snapshot kv.Snapshot
stats *runtimeStatsWithSnapshot
cacheTable kv.MemBuffer
snapshot kv.Snapshot
stats *runtimeStatsWithSnapshot
}

// buildVirtualColumnInfo saves virtual column indices and sort them in definition order
Expand All @@ -104,35 +103,21 @@ func (e *BatchPointGetExec) Open(context.Context) error {
return err
}
e.txn = txn
var snapshot kv.Snapshot
if txn.Valid() && txnCtx.StartTS == txnCtx.GetForUpdateTS() && txnCtx.StartTS == e.snapshotTS {
// We can safely reuse the transaction snapshot if snapshotTS is equal to forUpdateTS.
// The snapshot may contain cache that can reduce RPC call.
snapshot = txn.GetSnapshot()
} else {
snapshot = e.ctx.GetSnapshotWithTS(e.snapshotTS)
}
if e.ctx.GetSessionVars().StmtCtx.RCCheckTS {
snapshot.SetOption(kv.IsolationLevel, kv.RCCheckTS)
}
if e.cacheTable != nil {
snapshot = cacheTableSnapshot{snapshot, e.cacheTable}
}
if e.runtimeStats != nil {
snapshotStats := &txnsnapshot.SnapshotRuntimeStats{}
e.stats = &runtimeStatsWithSnapshot{
SnapshotRuntimeStats: snapshotStats,
}
snapshot.SetOption(kv.CollectRuntimeStats, snapshotStats)
e.snapshot.SetOption(kv.CollectRuntimeStats, snapshotStats)
stmtCtx.RuntimeStatsColl.RegisterStats(e.id, e.stats)
}
replicaReadType := e.ctx.GetSessionVars().GetReplicaRead()
if replicaReadType.IsFollowerRead() && !e.ctx.GetSessionVars().StmtCtx.RCCheckTS {
snapshot.SetOption(kv.ReplicaRead, replicaReadType)
e.snapshot.SetOption(kv.ReplicaRead, replicaReadType)
}
snapshot.SetOption(kv.TaskID, stmtCtx.TaskID)
snapshot.SetOption(kv.ReadReplicaScope, e.readReplicaScope)
snapshot.SetOption(kv.IsStalenessReadOnly, e.isStaleness)
e.snapshot.SetOption(kv.TaskID, stmtCtx.TaskID)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think kv.TaskID , kv. ReplicaRead and kv.IsStalenessReadOnly can be set in provider. And kv.IsStalenessReadOnly should and should only be set in StalenessProvider.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I will refactor it.

e.snapshot.SetOption(kv.ReadReplicaScope, e.readReplicaScope)
e.snapshot.SetOption(kv.IsStalenessReadOnly, e.isStaleness)
failpoint.Inject("assertBatchPointReplicaOption", func(val failpoint.Value) {
assertScope := val.(string)
if replicaReadType.IsClosestRead() && assertScope != e.readReplicaScope {
Expand All @@ -141,26 +126,25 @@ func (e *BatchPointGetExec) Open(context.Context) error {
})

if replicaReadType.IsClosestRead() && e.readReplicaScope != kv.GlobalTxnScope {
snapshot.SetOption(kv.MatchStoreLabels, []*metapb.StoreLabel{
e.snapshot.SetOption(kv.MatchStoreLabels, []*metapb.StoreLabel{
{
Key: placement.DCLabelKey,
Value: e.readReplicaScope,
},
})
}
setOptionForTopSQL(stmtCtx, snapshot)
var batchGetter kv.BatchGetter = snapshot
setOptionForTopSQL(stmtCtx, e.snapshot)
var batchGetter kv.BatchGetter = e.snapshot
if txn.Valid() {
lock := e.tblInfo.Lock
if e.lock {
batchGetter = driver.NewBufferBatchGetter(txn.GetMemBuffer(), &PessimisticLockCacheGetter{txnCtx: txnCtx}, snapshot)
batchGetter = driver.NewBufferBatchGetter(txn.GetMemBuffer(), &PessimisticLockCacheGetter{txnCtx: txnCtx}, e.snapshot)
} else if lock != nil && (lock.Tp == model.TableLockRead || lock.Tp == model.TableLockReadOnly) && e.ctx.GetSessionVars().EnablePointGetCache {
batchGetter = newCacheBatchGetter(e.ctx, e.tblInfo.ID, snapshot)
batchGetter = newCacheBatchGetter(e.ctx, e.tblInfo.ID, e.snapshot)
} else {
batchGetter = driver.NewBufferBatchGetter(txn.GetMemBuffer(), nil, snapshot)
batchGetter = driver.NewBufferBatchGetter(txn.GetMemBuffer(), nil, e.snapshot)
}
}
e.snapshot = snapshot
e.batchGetter = batchGetter
return nil
}
Expand Down
19 changes: 18 additions & 1 deletion executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -1561,6 +1561,14 @@ func (b *executorBuilder) getSnapshotTS() (uint64, error) {
return txnManager.GetStmtReadTS()
}

func (b *executorBuilder) getSnapshot() (kv.Snapshot, error) {
lcwangchao marked this conversation as resolved.
Show resolved Hide resolved
txnManager := sessiontxn.GetTxnManager(b.ctx)
if b.inInsertStmt || b.inUpdateStmt || b.inDeleteStmt || b.inSelectLockStmt {
return txnManager.GetForUpdateSnapshot()
}
return txnManager.GetReadSnapshot()
}

func (b *executorBuilder) buildMemTable(v *plannercore.PhysicalMemTable) Executor {
switch v.DBName.L {
case util.MetricSchemaName.L:
Expand Down Expand Up @@ -4600,6 +4608,12 @@ func (b *executorBuilder) buildBatchPointGet(plan *plannercore.BatchPointGetPlan
return nil
}

snapshot, err := b.getSnapshot()
if err != nil {
b.err = err
return nil
}

decoder := NewRowDecoder(b.ctx, plan.Schema(), plan.TblInfo)
e := &BatchPointGetExec{
baseExecutor: newBaseExecutor(b.ctx, plan.Schema(), plan.ID()),
Expand All @@ -4618,10 +4632,13 @@ func (b *executorBuilder) buildBatchPointGet(plan *plannercore.BatchPointGetPlan
singlePart: plan.SinglePart,
partTblID: plan.PartTblID,
columns: plan.Columns,
snapshot: snapshot,
}

if plan.TblInfo.TableCacheStatusType == model.TableCacheStatusEnable {
e.cacheTable = b.getCacheTable(plan.TblInfo, snapshotTS)
if cacheTable := b.getCacheTable(plan.TblInfo, snapshotTS); cacheTable != nil {
e.snapshot = cacheTableSnapshot{e.snapshot, cacheTable}
}
}

if plan.TblInfo.TempTableType != model.TempTableNone {
Expand Down
37 changes: 16 additions & 21 deletions executor/point_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,19 +62,27 @@ func (b *executorBuilder) buildPointGet(p *plannercore.PointGetPlan) Executor {
return nil
}

snapshot, err := b.getSnapshot()
if err != nil {
b.err = err
return nil
}

e := &PointGetExecutor{
baseExecutor: newBaseExecutor(b.ctx, p.Schema(), p.ID()),
readReplicaScope: b.readReplicaScope,
isStaleness: b.isStaleness,
}

if p.TblInfo.TableCacheStatusType == model.TableCacheStatusEnable {
e.cacheTable = b.getCacheTable(p.TblInfo, snapshotTS)
}

e.base().initCap = 1
e.base().maxChunkSize = 1
e.Init(p, snapshotTS)
e.Init(p, snapshotTS, snapshot)

if p.TblInfo.TableCacheStatusType == model.TableCacheStatusEnable {
if cacheTable := b.getCacheTable(p.TblInfo, snapshotTS); cacheTable != nil {
e.snapshot = cacheTableSnapshot{e.snapshot, cacheTable}
}
}

if e.lock {
b.hasLock = true
Expand Down Expand Up @@ -112,18 +120,18 @@ type PointGetExecutor struct {
// virtualColumnRetFieldTypes records the RetFieldTypes of virtual columns.
virtualColumnRetFieldTypes []*types.FieldType

stats *runtimeStatsWithSnapshot
cacheTable kv.MemBuffer
stats *runtimeStatsWithSnapshot
}

// Init set fields needed for PointGetExecutor reuse, this does NOT change baseExecutor field
func (e *PointGetExecutor) Init(p *plannercore.PointGetPlan, snapshotTS uint64) {
func (e *PointGetExecutor) Init(p *plannercore.PointGetPlan, snapshotTS uint64, snapshot kv.Snapshot) {
xhebox marked this conversation as resolved.
Show resolved Hide resolved
decoder := NewRowDecoder(e.ctx, p.Schema(), p.TblInfo)
e.tblInfo = p.TblInfo
e.handle = p.Handle
e.idxInfo = p.IndexInfo
e.idxVals = p.IndexValues
e.snapshotTS = snapshotTS
e.snapshot = snapshot
e.done = false
if e.tblInfo.TempTableType == model.TempTableNone {
e.lock = p.Lock
Expand Down Expand Up @@ -152,24 +160,11 @@ func (e *PointGetExecutor) buildVirtualColumnInfo() {

// Open implements the Executor interface.
func (e *PointGetExecutor) Open(context.Context) error {
txnCtx := e.ctx.GetSessionVars().TxnCtx
snapshotTS := e.snapshotTS
var err error
e.txn, err = e.ctx.Txn(false)
if err != nil {
return err
}
if e.txn.Valid() && txnCtx.StartTS == txnCtx.GetForUpdateTS() && txnCtx.StartTS == snapshotTS {
e.snapshot = e.txn.GetSnapshot()
} else {
e.snapshot = e.ctx.GetSnapshotWithTS(snapshotTS)
}
if e.ctx.GetSessionVars().StmtCtx.RCCheckTS {
e.snapshot.SetOption(kv.IsolationLevel, kv.RCCheckTS)
}
if e.cacheTable != nil {
e.snapshot = cacheTableSnapshot{e.snapshot, e.cacheTable}
}
if err := e.verifyTxnScope(); err != nil {
return err
}
Expand Down
7 changes: 0 additions & 7 deletions session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -3156,13 +3156,6 @@ func (s *session) RefreshTxnCtx(ctx context.Context) error {
return sessiontxn.NewTxn(ctx, s)
}

// GetSnapshotWithTS returns a snapshot with ts.
func (s *session) GetSnapshotWithTS(ts uint64) kv.Snapshot {
snap := s.GetStore().GetSnapshot(kv.Version{Ver: ts})
snap.SetOption(kv.SnapInterceptor, s.getSnapshotInterceptor())
return snap
}

// GetStore gets the store of session.
func (s *session) GetStore() kv.Storage {
return s.store
Expand Down
14 changes: 14 additions & 0 deletions session/txnmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,20 @@ func (m *txnManager) GetStmtForUpdateTS() (uint64, error) {
return ts, nil
}

func (m *txnManager) GetReadSnapshot() (kv.Snapshot, error) {
if m.ctxProvider == nil {
return nil, errors.New("context provider not set")
}
return m.ctxProvider.GetReadSnapshot()
}

func (m *txnManager) GetForUpdateSnapshot() (kv.Snapshot, error) {
if m.ctxProvider == nil {
return nil, errors.New("context provider not set")
}
return m.ctxProvider.GetForUpdateSnapshot()
}

func (m *txnManager) GetContextProvider() sessiontxn.TxnContextProvider {
return m.ctxProvider
}
Expand Down
3 changes: 0 additions & 3 deletions sessionctx/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,9 +109,6 @@ type Context interface {
// only used to daemon session like `statsHandle` to detect global variable change.
RefreshVars(context.Context) error

// GetSnapshotWithTS returns a snapshot with start ts
GetSnapshotWithTS(ts uint64) kv.Snapshot

// GetStore returns the store of session.
GetStore() kv.Storage

Expand Down
8 changes: 8 additions & 0 deletions sessiontxn/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,10 @@ type TxnContextProvider interface {
GetStmtReadTS() (uint64, error)
// GetStmtForUpdateTS returns the read timestamp used by update/insert/delete or select ... for update
GetStmtForUpdateTS() (uint64, error)
// GetReadSnapshot get snapshot with read ts
GetReadSnapshot() (kv.Snapshot, error)
// GetForUpdateSnapshot get snapshot with for update ts
GetForUpdateSnapshot() (kv.Snapshot, error)

// OnInitialize is the hook that should be called when enter a new txn with this provider
OnInitialize(ctx context.Context, enterNewTxnType EnterNewTxnType) error
Expand All @@ -147,6 +151,10 @@ type TxnManager interface {
GetStmtForUpdateTS() (uint64, error)
// GetContextProvider returns the current TxnContextProvider
GetContextProvider() TxnContextProvider
// GetReadSnapshot get snapshot with read ts
GetReadSnapshot() (kv.Snapshot, error)
lcwangchao marked this conversation as resolved.
Show resolved Hide resolved
// GetForUpdateSnapshot get snapshot with for update ts
GetForUpdateSnapshot() (kv.Snapshot, error)

// EnterNewTxn enters a new transaction.
EnterNewTxn(ctx context.Context, req *EnterNewTxnRequest) error
Expand Down
34 changes: 34 additions & 0 deletions sessiontxn/isolation/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,3 +279,37 @@ func (p *baseTxnContextProvider) AdviseWarmup() error {
func (p *baseTxnContextProvider) AdviseOptimizeWithPlan(_ interface{}) error {
return nil
}

// GetReadSnapshot get snapshot with read ts
func (p *baseTxnContextProvider) GetReadSnapshot() (kv.Snapshot, error) {
ts, err := p.GetStmtReadTS()
if err != nil {
return nil, err
}

return p.getSnapshotByTS(ts)
}

// GetForUpdateSnapshot get snapshot with for update ts
func (p *baseTxnContextProvider) GetForUpdateSnapshot() (kv.Snapshot, error) {
ts, err := p.GetStmtForUpdateTS()
if err != nil {
return nil, err
}

return p.getSnapshotByTS(ts)
}

func (p *baseTxnContextProvider) getSnapshotByTS(snapshotTS uint64) (kv.Snapshot, error) {
txn, err := p.sctx.Txn(false)
if err != nil {
return nil, err
}

txnCtx := p.sctx.GetSessionVars().TxnCtx
if txn.Valid() && txnCtx.StartTS == txnCtx.GetForUpdateTS() && txnCtx.StartTS == snapshotTS {
return txn.GetSnapshot(), nil
}

return sessiontxn.GetSnapshotWithTS(p.sctx, snapshotTS), nil
}
14 changes: 14 additions & 0 deletions sessiontxn/isolation/readcommitted.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,3 +257,17 @@ func (p *PessimisticRCTxnContextProvider) AdviseOptimizeWithPlan(val interface{}

return nil
}

// GetReadSnapshot get snapshot with read ts
func (p *PessimisticRCTxnContextProvider) GetReadSnapshot() (kv.Snapshot, error) {
snapshot, err := p.baseTxnContextProvider.GetForUpdateSnapshot()
if err != nil {
return nil, err
}

if p.sctx.GetSessionVars().StmtCtx.RCCheckTS {
snapshot.SetOption(kv.IsolationLevel, kv.RCCheckTS)
}

return snapshot, nil
}
25 changes: 25 additions & 0 deletions sessiontxn/staleread/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,3 +149,28 @@ func (p *StalenessTxnContextProvider) AdviseWarmup() error {
func (p *StalenessTxnContextProvider) AdviseOptimizeWithPlan(_ interface{}) error {
return nil
}

// GetReadSnapshot get snapshot with read ts
func (p *StalenessTxnContextProvider) GetReadSnapshot() (kv.Snapshot, error) {
return p.getStalenessSnapshot()
}

// GetForUpdateSnapshot get snapshot with for update ts
func (p *StalenessTxnContextProvider) GetForUpdateSnapshot() (kv.Snapshot, error) {
return p.getStalenessSnapshot()
}

func (p *StalenessTxnContextProvider) getStalenessSnapshot() (kv.Snapshot, error) {
snapshotTS := p.ts
txn, err := p.sctx.Txn(false)
if err != nil {
return nil, err
}

txnCtx := p.sctx.GetSessionVars().TxnCtx
if txn.Valid() && txnCtx.StartTS == txnCtx.GetForUpdateTS() && txnCtx.StartTS == snapshotTS {
xhebox marked this conversation as resolved.
Show resolved Hide resolved
return txn.GetSnapshot(), nil
}

return sessiontxn.GetSnapshotWithTS(p.sctx, snapshotTS), nil
}
8 changes: 8 additions & 0 deletions sessiontxn/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/table/temptable"
"github.com/tikv/client-go/v2/oracle"
)

Expand Down Expand Up @@ -71,6 +72,13 @@ func CanReuseTxnWhenExplicitBegin(sctx sessionctx.Context) bool {
return txnCtx.History == nil && !txnCtx.IsStaleness && sessVars.SnapshotTS == 0
}

// GetSnapshotWithTS returns a snapshot with ts.
func GetSnapshotWithTS(s sessionctx.Context, ts uint64) kv.Snapshot {
snap := s.GetStore().GetSnapshot(kv.Version{Ver: ts})
snap.SetOption(kv.SnapInterceptor, temptable.SessionSnapshotInterceptor(s))
return snap
}

// SetTxnAssertionLevel sets assertion level of a transactin. Note that assertion level should be set only once just
// after creating a new transaction.
func SetTxnAssertionLevel(txn kv.Transaction, assertionLevel variable.AssertionLevel) {
Expand Down
Loading