Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

txn: txnManager providers GetReadSnapshot and GetForUpdateSnapshot. #35788

Merged
merged 20 commits into from
Jul 1, 2022
Merged
Show file tree
Hide file tree
Changes from 19 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion executor/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ func (a *ExecStmt) PointGet(ctx context.Context, is infoschema.InfoSchema) (*rec
} else {
// CachedPlan type is already checked in last step
pointGetPlan := a.PsStmt.PreparedAst.CachedPlan.(*plannercore.PointGetPlan)
exec.Init(pointGetPlan, startTs)
exec.Init(pointGetPlan)
a.PsStmt.Executor = exec
}
}
Expand Down
106 changes: 27 additions & 79 deletions executor/batch_point_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@ import (

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/tidb/ddl/placement"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/parser/model"
Expand All @@ -40,35 +38,31 @@ import (
"github.com/pingcap/tidb/util/logutil/consistency"
"github.com/pingcap/tidb/util/mathutil"
"github.com/pingcap/tidb/util/rowcodec"
"github.com/tikv/client-go/v2/txnkv/txnsnapshot"
)

// BatchPointGetExec executes a bunch of point select queries.
type BatchPointGetExec struct {
baseExecutor

tblInfo *model.TableInfo
idxInfo *model.IndexInfo
handles []kv.Handle
physIDs []int64
partExpr *tables.PartitionExpr
partPos int
singlePart bool
partTblID int64
idxVals [][]types.Datum
readReplicaScope string
isStaleness bool
snapshotTS uint64
txn kv.Transaction
lock bool
waitTime int64
inited uint32
values [][]byte
index int
rowDecoder *rowcodec.ChunkDecoder
keepOrder bool
desc bool
batchGetter kv.BatchGetter
tblInfo *model.TableInfo
idxInfo *model.IndexInfo
handles []kv.Handle
physIDs []int64
partExpr *tables.PartitionExpr
partPos int
singlePart bool
partTblID int64
idxVals [][]types.Datum
txn kv.Transaction
lock bool
waitTime int64
inited uint32
values [][]byte
index int
rowDecoder *rowcodec.ChunkDecoder
keepOrder bool
desc bool
batchGetter kv.BatchGetter

columns []*model.ColumnInfo
// virtualColumnIndex records all the indices of virtual columns and sort them in definition
Expand All @@ -78,9 +72,8 @@ type BatchPointGetExec struct {
// virtualColumnRetFieldTypes records the RetFieldTypes of virtual columns.
virtualColumnRetFieldTypes []*types.FieldType

snapshot kv.Snapshot
stats *runtimeStatsWithSnapshot
cacheTable kv.MemBuffer
snapshot kv.Snapshot
stats *runtimeStatsWithSnapshot
}

// buildVirtualColumnInfo saves virtual column indices and sort them in definition order
Expand All @@ -98,69 +91,24 @@ func (e *BatchPointGetExec) buildVirtualColumnInfo() {
func (e *BatchPointGetExec) Open(context.Context) error {
sessVars := e.ctx.GetSessionVars()
txnCtx := sessVars.TxnCtx
stmtCtx := sessVars.StmtCtx
txn, err := e.ctx.Txn(false)
if err != nil {
return err
}
e.txn = txn
var snapshot kv.Snapshot
if txn.Valid() && txnCtx.StartTS == txnCtx.GetForUpdateTS() && txnCtx.StartTS == e.snapshotTS {
// We can safely reuse the transaction snapshot if snapshotTS is equal to forUpdateTS.
// The snapshot may contain cache that can reduce RPC call.
snapshot = txn.GetSnapshot()
} else {
snapshot = e.ctx.GetSnapshotWithTS(e.snapshotTS)
}
if e.ctx.GetSessionVars().StmtCtx.RCCheckTS {
snapshot.SetOption(kv.IsolationLevel, kv.RCCheckTS)
}
if e.cacheTable != nil {
snapshot = cacheTableSnapshot{snapshot, e.cacheTable}
}
if e.runtimeStats != nil {
snapshotStats := &txnsnapshot.SnapshotRuntimeStats{}
e.stats = &runtimeStatsWithSnapshot{
SnapshotRuntimeStats: snapshotStats,
}
snapshot.SetOption(kv.CollectRuntimeStats, snapshotStats)
stmtCtx.RuntimeStatsColl.RegisterStats(e.id, e.stats)
}
replicaReadType := e.ctx.GetSessionVars().GetReplicaRead()
if replicaReadType.IsFollowerRead() && !e.ctx.GetSessionVars().StmtCtx.RCCheckTS {
snapshot.SetOption(kv.ReplicaRead, replicaReadType)
}
snapshot.SetOption(kv.TaskID, stmtCtx.TaskID)
snapshot.SetOption(kv.ReadReplicaScope, e.readReplicaScope)
snapshot.SetOption(kv.IsStalenessReadOnly, e.isStaleness)
failpoint.Inject("assertBatchPointReplicaOption", func(val failpoint.Value) {
assertScope := val.(string)
if replicaReadType.IsClosestRead() && assertScope != e.readReplicaScope {
panic("batch point get replica option fail")
}
})

if replicaReadType.IsClosestRead() && e.readReplicaScope != kv.GlobalTxnScope {
snapshot.SetOption(kv.MatchStoreLabels, []*metapb.StoreLabel{
{
Key: placement.DCLabelKey,
Value: e.readReplicaScope,
},
})
}
setOptionForTopSQL(stmtCtx, snapshot)
var batchGetter kv.BatchGetter = snapshot

setOptionForTopSQL(e.ctx.GetSessionVars().StmtCtx, e.snapshot)
var batchGetter kv.BatchGetter = e.snapshot
if txn.Valid() {
lock := e.tblInfo.Lock
if e.lock {
batchGetter = driver.NewBufferBatchGetter(txn.GetMemBuffer(), &PessimisticLockCacheGetter{txnCtx: txnCtx}, snapshot)
batchGetter = driver.NewBufferBatchGetter(txn.GetMemBuffer(), &PessimisticLockCacheGetter{txnCtx: txnCtx}, e.snapshot)
} else if lock != nil && (lock.Tp == model.TableLockRead || lock.Tp == model.TableLockReadOnly) && e.ctx.GetSessionVars().EnablePointGetCache {
batchGetter = newCacheBatchGetter(e.ctx, e.tblInfo.ID, snapshot)
batchGetter = newCacheBatchGetter(e.ctx, e.tblInfo.ID, e.snapshot)
} else {
batchGetter = driver.NewBufferBatchGetter(txn.GetMemBuffer(), nil, snapshot)
batchGetter = driver.NewBufferBatchGetter(txn.GetMemBuffer(), nil, e.snapshot)
}
}
e.snapshot = snapshot
e.batchGetter = batchGetter
return nil
}
Expand Down
100 changes: 78 additions & 22 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,10 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/diagnosticspb"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/ddl"
"github.com/pingcap/tidb/ddl/placement"
"github.com/pingcap/tidb/distsql"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/executor/aggfuncs"
Expand Down Expand Up @@ -68,6 +70,7 @@ import (
"github.com/pingcap/tidb/util/timeutil"
"github.com/pingcap/tipb/go-tipb"
"github.com/tikv/client-go/v2/tikv"
"github.com/tikv/client-go/v2/txnkv/txnsnapshot"
)

var (
Expand Down Expand Up @@ -1528,6 +1531,39 @@ func (b *executorBuilder) getSnapshotTS() (uint64, error) {
return txnManager.GetStmtReadTS()
}

// getSnapshot get the appropriate snapshot from txnManager and set
// the relevant snapshot options before return.
func (b *executorBuilder) getSnapshot() (kv.Snapshot, error) {
lcwangchao marked this conversation as resolved.
Show resolved Hide resolved
var snapshot kv.Snapshot
var err error

txnManager := sessiontxn.GetTxnManager(b.ctx)
if b.inInsertStmt || b.inUpdateStmt || b.inDeleteStmt || b.inSelectLockStmt {
snapshot, err = txnManager.GetForUpdateSnapshot()
} else {
snapshot, err = txnManager.GetReadSnapshot()
}
if err != nil {
return nil, err
}

sessVars := b.ctx.GetSessionVars()
replicaReadType := sessVars.GetReplicaRead()
snapshot.SetOption(kv.ReadReplicaScope, b.readReplicaScope)
snapshot.SetOption(kv.TaskID, sessVars.StmtCtx.TaskID)

if replicaReadType.IsClosestRead() && b.readReplicaScope != kv.GlobalTxnScope {
snapshot.SetOption(kv.MatchStoreLabels, []*metapb.StoreLabel{
{
Key: placement.DCLabelKey,
Value: b.readReplicaScope,
},
})
}

return snapshot, nil
}

func (b *executorBuilder) buildMemTable(v *plannercore.PhysicalMemTable) Executor {
switch v.DBName.L {
case util.MetricSchemaName.L:
Expand Down Expand Up @@ -4549,7 +4585,8 @@ func NewRowDecoder(ctx sessionctx.Context, schema *expression.Schema, tbl *model
}

func (b *executorBuilder) buildBatchPointGet(plan *plannercore.BatchPointGetPlan) Executor {
if err := b.validCanReadTemporaryOrCacheTable(plan.TblInfo); err != nil {
var err error
if err = b.validCanReadTemporaryOrCacheTable(plan.TblInfo); err != nil {
b.err = err
return nil
}
Expand All @@ -4561,34 +4598,53 @@ func (b *executorBuilder) buildBatchPointGet(plan *plannercore.BatchPointGetPlan
}()
}

snapshotTS, err := b.getSnapshotTS()
decoder := NewRowDecoder(b.ctx, plan.Schema(), plan.TblInfo)
e := &BatchPointGetExec{
baseExecutor: newBaseExecutor(b.ctx, plan.Schema(), plan.ID()),
tblInfo: plan.TblInfo,
idxInfo: plan.IndexInfo,
rowDecoder: decoder,
keepOrder: plan.KeepOrder,
desc: plan.Desc,
lock: plan.Lock,
waitTime: plan.LockWaitTime,
partExpr: plan.PartitionExpr,
partPos: plan.PartitionColPos,
singlePart: plan.SinglePart,
partTblID: plan.PartTblID,
columns: plan.Columns,
}

e.snapshot, err = b.getSnapshot()
if err != nil {
b.err = err
return nil
}

decoder := NewRowDecoder(b.ctx, plan.Schema(), plan.TblInfo)
e := &BatchPointGetExec{
baseExecutor: newBaseExecutor(b.ctx, plan.Schema(), plan.ID()),
tblInfo: plan.TblInfo,
idxInfo: plan.IndexInfo,
rowDecoder: decoder,
snapshotTS: snapshotTS,
readReplicaScope: b.readReplicaScope,
isStaleness: b.isStaleness,
keepOrder: plan.KeepOrder,
desc: plan.Desc,
lock: plan.Lock,
waitTime: plan.LockWaitTime,
partExpr: plan.PartitionExpr,
partPos: plan.PartitionColPos,
singlePart: plan.SinglePart,
partTblID: plan.PartTblID,
columns: plan.Columns,
if e.runtimeStats != nil {
snapshotStats := &txnsnapshot.SnapshotRuntimeStats{}
e.stats = &runtimeStatsWithSnapshot{
SnapshotRuntimeStats: snapshotStats,
}
e.snapshot.SetOption(kv.CollectRuntimeStats, snapshotStats)
b.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.id, e.stats)
}

failpoint.Inject("assertBatchPointReplicaOption", func(val failpoint.Value) {
assertScope := val.(string)
if e.ctx.GetSessionVars().GetReplicaRead().IsClosestRead() && assertScope != b.readReplicaScope {
panic("batch point get replica option fail")
}
})

snapshotTS, err := b.getSnapshotTS()
if err != nil {
b.err = err
return nil
}
if plan.TblInfo.TableCacheStatusType == model.TableCacheStatusEnable {
e.cacheTable = b.getCacheTable(plan.TblInfo, snapshotTS)
if cacheTable := b.getCacheTable(plan.TblInfo, snapshotTS); cacheTable != nil {
e.snapshot = cacheTableSnapshot{e.snapshot, cacheTable}
}
}

if plan.TblInfo.TempTableType != model.TempTableNone {
Expand Down
Loading