Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

store/tikv: remove use of Priority transaction options in store/tikv #24360

Merged
merged 7 commits into from
Apr 29, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion ddl/backfilling.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ func newBackfillWorker(sessCtx sessionctx.Context, worker *worker, id int, t tab
sessCtx: sessCtx,
taskCh: make(chan *reorgBackfillTask, 1),
resultCh: make(chan *backfillResult, 1),
priority: tikvstore.PriorityLow,
priority: kv.PriorityLow,
}
}

Expand Down
3 changes: 1 addition & 2 deletions ddl/reorg.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ import (
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/stmtctx"
"github.com/pingcap/tidb/statistics"
tikvstore "github.com/pingcap/tidb/store/tikv/kv"
"github.com/pingcap/tidb/store/tikv/oracle"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/table/tables"
Expand Down Expand Up @@ -425,7 +424,7 @@ func (dc *ddlCtx) buildDescTableScan(ctx context.Context, startTS uint64, tbl ta
SetConcurrency(1).SetDesc(true)

builder.Request.NotFillCache = true
builder.Request.Priority = tikvstore.PriorityLow
builder.Request.Priority = kv.PriorityLow

kvReq, err := builder.Build()
if err != nil {
Expand Down
11 changes: 5 additions & 6 deletions distsql/request_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
"github.com/pingcap/tidb/sessionctx/stmtctx"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/statistics"
tikvstore "github.com/pingcap/tidb/store/tikv/kv"
"github.com/pingcap/tidb/store/tikv/oracle"
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/types"
Expand Down Expand Up @@ -146,7 +145,7 @@ func (builder *RequestBuilder) SetAnalyzeRequest(ana *tipb.AnalyzeReq) *RequestB
builder.Request.Data, builder.err = ana.Marshal()
builder.Request.NotFillCache = true
builder.Request.IsolationLevel = kv.RC
builder.Request.Priority = tikvstore.PriorityLow
builder.Request.Priority = kv.PriorityLow
}

return builder
Expand Down Expand Up @@ -210,13 +209,13 @@ func (builder *RequestBuilder) getIsolationLevel() kv.IsoLevel {
func (builder *RequestBuilder) getKVPriority(sv *variable.SessionVars) int {
switch sv.StmtCtx.Priority {
case mysql.NoPriority, mysql.DelayedPriority:
return tikvstore.PriorityNormal
return kv.PriorityNormal
case mysql.LowPriority:
return tikvstore.PriorityLow
return kv.PriorityLow
case mysql.HighPriority:
return tikvstore.PriorityHigh
return kv.PriorityHigh
}
return tikvstore.PriorityNormal
return kv.PriorityNormal
}

// SetFromSessionVars sets the following fields for "kv.Request" from session variables:
Expand Down
10 changes: 5 additions & 5 deletions executor/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ func (a *ExecStmt) PointGet(ctx context.Context, is infoschema.InfoSchema) (*rec
if err != nil {
return nil, err
}
a.Ctx.GetSessionVars().StmtCtx.Priority = tikvstore.PriorityHigh
a.Ctx.GetSessionVars().StmtCtx.Priority = kv.PriorityHigh

// try to reuse point get executor
if a.PsStmt.Executor != nil {
Expand Down Expand Up @@ -730,15 +730,15 @@ func (a *ExecStmt) buildExecutor() (Executor, error) {
if stmtPri := stmtCtx.Priority; stmtPri == mysql.NoPriority {
switch {
case useMaxTS:
stmtCtx.Priority = tikvstore.PriorityHigh
stmtCtx.Priority = kv.PriorityHigh
case a.LowerPriority:
stmtCtx.Priority = tikvstore.PriorityLow
stmtCtx.Priority = kv.PriorityLow
}
}
}
}
if _, ok := a.Plan.(*plannercore.Analyze); ok && ctx.GetSessionVars().InRestrictedSQL {
ctx.GetSessionVars().StmtCtx.Priority = tikvstore.PriorityLow
ctx.GetSessionVars().StmtCtx.Priority = kv.PriorityLow
}

b := newExecutorBuilder(ctx, a.InfoSchema)
Expand All @@ -758,7 +758,7 @@ func (a *ExecStmt) buildExecutor() (Executor, error) {
a.isPreparedStmt = true
a.Plan = executorExec.plan
if executorExec.lowerPriority {
ctx.GetSessionVars().StmtCtx.Priority = tikvstore.PriorityLow
ctx.GetSessionVars().StmtCtx.Priority = kv.PriorityLow
}
e = executorExec.stmtExec
}
Expand Down
4 changes: 2 additions & 2 deletions executor/analyze.go
Original file line number Diff line number Diff line change
Expand Up @@ -964,7 +964,7 @@ func (e *AnalyzeFastExec) activateTxnForRowCount() (rollbackFn func() error, err
return nil, errors.Trace(err)
}
}
txn.SetOption(tikvstore.Priority, tikvstore.PriorityLow)
txn.SetOption(tikvstore.Priority, kv.PriorityLow)
txn.SetOption(tikvstore.IsolationLevel, kv.RC)
txn.SetOption(tikvstore.NotFillCache, true)
return rollbackFn, nil
Expand Down Expand Up @@ -1186,7 +1186,7 @@ func (e *AnalyzeFastExec) handleSampTasks(workID int, step uint32, err *error) {
snapshot := e.ctx.GetStore().GetSnapshot(kv.MaxVersion)
snapshot.SetOption(tikvstore.NotFillCache, true)
snapshot.SetOption(tikvstore.IsolationLevel, kv.RC)
snapshot.SetOption(tikvstore.Priority, tikvstore.PriorityLow)
snapshot.SetOption(tikvstore.Priority, kv.PriorityLow)
if e.ctx.GetSessionVars().GetReplicaRead().IsFollowerRead() {
snapshot.SetOption(tikvstore.ReplicaRead, tikvstore.ReplicaReadFollower)
}
Expand Down
7 changes: 7 additions & 0 deletions kv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -439,6 +439,13 @@ type SplittableStore interface {
CheckRegionInScattering(regionID uint64) (bool, error)
}

// Priority value for transaction priority.
const (
PriorityNormal = iota
PriorityLow
PriorityHigh
)

// IsoLevel is the transaction's isolation level.
type IsoLevel int

Expand Down
2 changes: 1 addition & 1 deletion kv/mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func (s testMockSuite) TestInterface(c *C) {
snapshot := storage.GetSnapshot(version)
_, err = snapshot.BatchGet(context.Background(), []Key{Key("abc"), Key("def")})
c.Check(err, IsNil)
snapshot.SetOption(tikvstore.Priority, tikvstore.PriorityNormal)
snapshot.SetOption(tikvstore.Priority, PriorityNormal)

transaction, err := storage.Begin()
c.Check(err, IsNil)
Expand Down
10 changes: 5 additions & 5 deletions meta/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ type Meta struct {
// NewMeta creates a Meta in transaction txn.
// If the current Meta needs to handle a job, jobListKey is the type of the job's list.
func NewMeta(txn kv.Transaction, jobListKeys ...JobListKeyType) *Meta {
txn.SetOption(tikvstore.Priority, tikvstore.PriorityHigh)
txn.SetOption(tikvstore.Priority, kv.PriorityHigh)
txn.SetOption(tikvstore.SyncLog, true)
t := structure.NewStructure(txn, txn, mMetaPrefix)
listKey := DefaultJobListKey
Expand Down Expand Up @@ -636,13 +636,13 @@ func (m *Meta) getDDLJob(key []byte, index int64) (*model.Job, error) {

job := &model.Job{
// For compatibility, if the job is enqueued by old version TiDB and Priority field is omitted,
// set the default priority to tikvstore.PriorityLow.
Priority: tikvstore.PriorityLow,
// set the default priority to kv.PriorityLow.
Priority: kv.PriorityLow,
}
err = job.Decode(value)
// Check if the job.Priority is valid.
if job.Priority < tikvstore.PriorityNormal || job.Priority > tikvstore.PriorityHigh {
job.Priority = tikvstore.PriorityLow
if job.Priority < kv.PriorityNormal || job.Priority > kv.PriorityHigh {
job.Priority = kv.PriorityLow
}
return job, errors.Trace(err)
}
Expand Down
2 changes: 1 addition & 1 deletion session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -1714,7 +1714,7 @@ func (s *session) cachedPlanExec(ctx context.Context,
s.txn.changeToInvalid()
case *plannercore.Update:
s.PrepareTSFuture(ctx)
stmtCtx.Priority = tikvstore.PriorityHigh
stmtCtx.Priority = kv.PriorityHigh
resultSet, err = runStmt(ctx, s, stmt)
case nil:
// cache is invalid
Expand Down
10 changes: 5 additions & 5 deletions sessionctx/variable/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -962,7 +962,7 @@ func NewSessionVars() *SessionVars {
OptimizerSelectivityLevel: DefTiDBOptimizerSelectivityLevel,
RetryLimit: DefTiDBRetryLimit,
DisableTxnAutoRetry: DefTiDBDisableTxnAutoRetry,
DDLReorgPriority: tikvstore.PriorityLow,
DDLReorgPriority: kv.PriorityLow,
allowInSubqToJoinAndAgg: DefOptInSubqToJoinAndAgg,
preferRangeScan: DefOptPreferRangeScan,
CorrelationThreshold: DefOptCorrelationThreshold,
Expand Down Expand Up @@ -1308,13 +1308,13 @@ func (s *SessionVars) setDDLReorgPriority(val string) {
val = strings.ToLower(val)
switch val {
case "priority_low":
s.DDLReorgPriority = tikvstore.PriorityLow
s.DDLReorgPriority = kv.PriorityLow
case "priority_normal":
s.DDLReorgPriority = tikvstore.PriorityNormal
s.DDLReorgPriority = kv.PriorityNormal
case "priority_high":
s.DDLReorgPriority = tikvstore.PriorityHigh
s.DDLReorgPriority = kv.PriorityHigh
default:
s.DDLReorgPriority = tikvstore.PriorityLow
s.DDLReorgPriority = kv.PriorityLow
}
}

Expand Down
2 changes: 1 addition & 1 deletion store/copr/batch_coprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,7 @@ func (b *batchCopIterator) handleTaskOnce(ctx context.Context, bo *tikv.Backoffe

req := tikvrpc.NewRequest(task.cmdType, &copReq, kvrpcpb.Context{
IsolationLevel: isolationLevelToPB(b.req.IsolationLevel),
Priority: tikv.PriorityToPB(b.req.Priority),
Priority: priorityToPB(b.req.Priority),
NotFillCache: b.req.NotFillCache,
RecordTimeStat: true,
RecordScanStat: true,
Expand Down
14 changes: 13 additions & 1 deletion store/copr/coprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -698,7 +698,7 @@ func (worker *copIteratorWorker) handleTaskOnce(bo *tikv.Backoffer, task *copTas

req := tikvrpc.NewReplicaReadRequest(task.cmdType, &copReq, worker.req.ReplicaRead, &worker.replicaReadSeed, kvrpcpb.Context{
IsolationLevel: isolationLevelToPB(worker.req.IsolationLevel),
Priority: tikv.PriorityToPB(worker.req.Priority),
Priority: priorityToPB(worker.req.Priority),
NotFillCache: worker.req.NotFillCache,
RecordTimeStat: true,
RecordScanStat: true,
Expand Down Expand Up @@ -1192,6 +1192,18 @@ func (e *rateLimitAction) isEnabled() bool {
return atomic.LoadUint32(&e.enabled) > 0
}

// priorityToPB converts priority type to wire type.
func priorityToPB(pri int) kvrpcpb.CommandPri {
switch pri {
case kv.PriorityLow:
return kvrpcpb.CommandPri_Low
case kv.PriorityHigh:
return kvrpcpb.CommandPri_High
default:
return kvrpcpb.CommandPri_Normal
}
}

func isolationLevelToPB(level kv.IsoLevel) kvrpcpb.IsolationLevel {
switch level {
case kv.RC:
Expand Down
13 changes: 13 additions & 0 deletions store/driver/txn/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ func (s *tikvSnapshot) SetOption(opt int, val interface{}) {
case tikvstore.IsolationLevel:
level := getTiKVIsolationLevel(val.(kv.IsoLevel))
s.KVSnapshot.SetIsolationLevel(level)
case tikvstore.Priority:
s.KVSnapshot.SetPriority(getTiKVPriority(val.(int)))
default:
s.KVSnapshot.SetOption(opt, val)
}
Expand All @@ -87,3 +89,14 @@ func getTiKVIsolationLevel(level kv.IsoLevel) tikv.IsoLevel {
return tikv.SI
}
}

func getTiKVPriority(pri int) tikv.Priority {
switch pri {
case kv.PriorityHigh:
return tikv.PriorityHigh
case kv.PriorityLow:
return tikv.PriorityLow
default:
return tikv.PriorityNormal
}
}
2 changes: 2 additions & 0 deletions store/driver/txn/txn_driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,8 @@ func (txn *tikvTxn) SetOption(opt int, val interface{}) {
case tikvstore.IsolationLevel:
level := getTiKVIsolationLevel(val.(kv.IsoLevel))
txn.KVTxn.GetSnapshot().SetIsolationLevel(level)
case tikvstore.Priority:
txn.KVTxn.SetPriority(getTiKVPriority(val.(int)))
case tikvstore.Pessimistic:
txn.SetPessimistic(val.(bool))
default:
Expand Down
21 changes: 1 addition & 20 deletions store/tikv/2pc.go
Original file line number Diff line number Diff line change
Expand Up @@ -425,7 +425,7 @@ func (c *twoPhaseCommitter) initKeysAndMutations() error {
metrics.TiKVTxnWriteSizeHistogram.Observe(float64(commitDetail.WriteSize))
c.hasNoNeedCommitKeys = checkCnt > 0
c.lockTTL = txnLockTTL(txn.startTime, size)
c.priority = getTxnPriority(txn)
c.priority = txn.priority.ToPB()
c.syncLog = getTxnSyncLog(txn)
c.setDetail(commitDetail)
return nil
Expand Down Expand Up @@ -1649,32 +1649,13 @@ func (batchExe *batchExecutor) process(batches []batchMutations) error {
return err
}

func getTxnPriority(txn *KVTxn) pb.CommandPri {
if pri := txn.us.GetOption(kv.Priority); pri != nil {
return PriorityToPB(pri.(int))
}
return pb.CommandPri_Normal
}

func getTxnSyncLog(txn *KVTxn) bool {
if syncOption := txn.us.GetOption(kv.SyncLog); syncOption != nil {
return syncOption.(bool)
}
return false
}

// PriorityToPB converts priority type to wire type.
func PriorityToPB(pri int) pb.CommandPri {
switch pri {
case kv.PriorityLow:
return pb.CommandPri_Low
case kv.PriorityHigh:
return pb.CommandPri_High
default:
return pb.CommandPri_Normal
}
}

func (c *twoPhaseCommitter) setDetail(d *util.CommitDetails) {
atomic.StorePointer(&c.detail, unsafe.Pointer(d))
}
Expand Down
1 change: 1 addition & 0 deletions store/tikv/kv/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ const (
)

// Priority value for transaction priority.
// TODO: remove after BR update.
const (
PriorityNormal = iota
PriorityLow
Expand Down
4 changes: 2 additions & 2 deletions store/tikv/scan.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ func (s *Scanner) getData(bo *Backoffer) error {
}
sreq := &pb.ScanRequest{
Context: &pb.Context{
Priority: s.snapshot.priority,
Priority: s.snapshot.priority.ToPB(),
NotFillCache: s.snapshot.notFillCache,
IsolationLevel: s.snapshot.isolationLevel.ToPB(),
},
Expand All @@ -207,7 +207,7 @@ func (s *Scanner) getData(bo *Backoffer) error {
}
s.snapshot.mu.RLock()
req := tikvrpc.NewReplicaReadRequest(tikvrpc.CmdScan, sreq, s.snapshot.mu.replicaRead, &s.snapshot.replicaReadSeed, pb.Context{
Priority: s.snapshot.priority,
Priority: s.snapshot.priority.ToPB(),
NotFillCache: s.snapshot.notFillCache,
TaskId: s.snapshot.mu.taskID,
})
Expand Down
Loading