diff --git a/ddl/backfilling.go b/ddl/backfilling.go index cd8935572f45c..56512eec6ab65 100644 --- a/ddl/backfilling.go +++ b/ddl/backfilling.go @@ -159,7 +159,7 @@ func newBackfillWorker(sessCtx sessionctx.Context, worker *worker, id int, t tab sessCtx: sessCtx, taskCh: make(chan *reorgBackfillTask, 1), resultCh: make(chan *backfillResult, 1), - priority: tikvstore.PriorityLow, + priority: kv.PriorityLow, } } diff --git a/ddl/reorg.go b/ddl/reorg.go index dd24b2934dc4b..fbe42573dbbf7 100644 --- a/ddl/reorg.go +++ b/ddl/reorg.go @@ -33,7 +33,6 @@ import ( "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/sessionctx/stmtctx" "github.com/pingcap/tidb/statistics" - tikvstore "github.com/pingcap/tidb/store/tikv/kv" "github.com/pingcap/tidb/store/tikv/oracle" "github.com/pingcap/tidb/table" "github.com/pingcap/tidb/table/tables" @@ -425,7 +424,7 @@ func (dc *ddlCtx) buildDescTableScan(ctx context.Context, startTS uint64, tbl ta SetConcurrency(1).SetDesc(true) builder.Request.NotFillCache = true - builder.Request.Priority = tikvstore.PriorityLow + builder.Request.Priority = kv.PriorityLow kvReq, err := builder.Build() if err != nil { diff --git a/distsql/request_builder.go b/distsql/request_builder.go index 9dfd4124304b4..ce577b993d009 100644 --- a/distsql/request_builder.go +++ b/distsql/request_builder.go @@ -27,7 +27,6 @@ import ( "github.com/pingcap/tidb/sessionctx/stmtctx" "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/statistics" - tikvstore "github.com/pingcap/tidb/store/tikv/kv" "github.com/pingcap/tidb/store/tikv/oracle" "github.com/pingcap/tidb/tablecodec" "github.com/pingcap/tidb/types" @@ -146,7 +145,7 @@ func (builder *RequestBuilder) SetAnalyzeRequest(ana *tipb.AnalyzeReq) *RequestB builder.Request.Data, builder.err = ana.Marshal() builder.Request.NotFillCache = true builder.Request.IsolationLevel = kv.RC - builder.Request.Priority = tikvstore.PriorityLow + builder.Request.Priority = kv.PriorityLow } return builder @@ -210,13 +209,13 @@ func (builder *RequestBuilder) getIsolationLevel() kv.IsoLevel { func (builder *RequestBuilder) getKVPriority(sv *variable.SessionVars) int { switch sv.StmtCtx.Priority { case mysql.NoPriority, mysql.DelayedPriority: - return tikvstore.PriorityNormal + return kv.PriorityNormal case mysql.LowPriority: - return tikvstore.PriorityLow + return kv.PriorityLow case mysql.HighPriority: - return tikvstore.PriorityHigh + return kv.PriorityHigh } - return tikvstore.PriorityNormal + return kv.PriorityNormal } // SetFromSessionVars sets the following fields for "kv.Request" from session variables: diff --git a/executor/adapter.go b/executor/adapter.go index 5a65a31e49c7d..5e5b7990f61d9 100644 --- a/executor/adapter.go +++ b/executor/adapter.go @@ -213,7 +213,7 @@ func (a *ExecStmt) PointGet(ctx context.Context, is infoschema.InfoSchema) (*rec if err != nil { return nil, err } - a.Ctx.GetSessionVars().StmtCtx.Priority = tikvstore.PriorityHigh + a.Ctx.GetSessionVars().StmtCtx.Priority = kv.PriorityHigh // try to reuse point get executor if a.PsStmt.Executor != nil { @@ -730,15 +730,15 @@ func (a *ExecStmt) buildExecutor() (Executor, error) { if stmtPri := stmtCtx.Priority; stmtPri == mysql.NoPriority { switch { case useMaxTS: - stmtCtx.Priority = tikvstore.PriorityHigh + stmtCtx.Priority = kv.PriorityHigh case a.LowerPriority: - stmtCtx.Priority = tikvstore.PriorityLow + stmtCtx.Priority = kv.PriorityLow } } } } if _, ok := a.Plan.(*plannercore.Analyze); ok && ctx.GetSessionVars().InRestrictedSQL { - ctx.GetSessionVars().StmtCtx.Priority = tikvstore.PriorityLow + ctx.GetSessionVars().StmtCtx.Priority = kv.PriorityLow } b := newExecutorBuilder(ctx, a.InfoSchema) @@ -758,7 +758,7 @@ func (a *ExecStmt) buildExecutor() (Executor, error) { a.isPreparedStmt = true a.Plan = executorExec.plan if executorExec.lowerPriority { - ctx.GetSessionVars().StmtCtx.Priority = tikvstore.PriorityLow + ctx.GetSessionVars().StmtCtx.Priority = kv.PriorityLow } e = executorExec.stmtExec } diff --git a/executor/analyze.go b/executor/analyze.go index f595931d0c036..8a2e665587aa4 100644 --- a/executor/analyze.go +++ b/executor/analyze.go @@ -964,7 +964,7 @@ func (e *AnalyzeFastExec) activateTxnForRowCount() (rollbackFn func() error, err return nil, errors.Trace(err) } } - txn.SetOption(tikvstore.Priority, tikvstore.PriorityLow) + txn.SetOption(tikvstore.Priority, kv.PriorityLow) txn.SetOption(tikvstore.IsolationLevel, kv.RC) txn.SetOption(tikvstore.NotFillCache, true) return rollbackFn, nil @@ -1186,7 +1186,7 @@ func (e *AnalyzeFastExec) handleSampTasks(workID int, step uint32, err *error) { snapshot := e.ctx.GetStore().GetSnapshot(kv.MaxVersion) snapshot.SetOption(tikvstore.NotFillCache, true) snapshot.SetOption(tikvstore.IsolationLevel, kv.RC) - snapshot.SetOption(tikvstore.Priority, tikvstore.PriorityLow) + snapshot.SetOption(tikvstore.Priority, kv.PriorityLow) if e.ctx.GetSessionVars().GetReplicaRead().IsFollowerRead() { snapshot.SetOption(tikvstore.ReplicaRead, tikvstore.ReplicaReadFollower) } diff --git a/kv/kv.go b/kv/kv.go index 4f020bbc9b3f9..1b1e1a5f46a4a 100644 --- a/kv/kv.go +++ b/kv/kv.go @@ -439,6 +439,13 @@ type SplittableStore interface { CheckRegionInScattering(regionID uint64) (bool, error) } +// Priority value for transaction priority. +const ( + PriorityNormal = iota + PriorityLow + PriorityHigh +) + // IsoLevel is the transaction's isolation level. type IsoLevel int diff --git a/kv/mock_test.go b/kv/mock_test.go index 6b8cec5ae9f66..45e45d5941251 100644 --- a/kv/mock_test.go +++ b/kv/mock_test.go @@ -35,7 +35,7 @@ func (s testMockSuite) TestInterface(c *C) { snapshot := storage.GetSnapshot(version) _, err = snapshot.BatchGet(context.Background(), []Key{Key("abc"), Key("def")}) c.Check(err, IsNil) - snapshot.SetOption(tikvstore.Priority, tikvstore.PriorityNormal) + snapshot.SetOption(tikvstore.Priority, PriorityNormal) transaction, err := storage.Begin() c.Check(err, IsNil) diff --git a/meta/meta.go b/meta/meta.go index 06bafa9dcd546..e5a0c97658b43 100644 --- a/meta/meta.go +++ b/meta/meta.go @@ -94,7 +94,7 @@ type Meta struct { // NewMeta creates a Meta in transaction txn. // If the current Meta needs to handle a job, jobListKey is the type of the job's list. func NewMeta(txn kv.Transaction, jobListKeys ...JobListKeyType) *Meta { - txn.SetOption(tikvstore.Priority, tikvstore.PriorityHigh) + txn.SetOption(tikvstore.Priority, kv.PriorityHigh) txn.SetOption(tikvstore.SyncLog, true) t := structure.NewStructure(txn, txn, mMetaPrefix) listKey := DefaultJobListKey @@ -636,13 +636,13 @@ func (m *Meta) getDDLJob(key []byte, index int64) (*model.Job, error) { job := &model.Job{ // For compatibility, if the job is enqueued by old version TiDB and Priority field is omitted, - // set the default priority to tikvstore.PriorityLow. - Priority: tikvstore.PriorityLow, + // set the default priority to kv.PriorityLow. + Priority: kv.PriorityLow, } err = job.Decode(value) // Check if the job.Priority is valid. - if job.Priority < tikvstore.PriorityNormal || job.Priority > tikvstore.PriorityHigh { - job.Priority = tikvstore.PriorityLow + if job.Priority < kv.PriorityNormal || job.Priority > kv.PriorityHigh { + job.Priority = kv.PriorityLow } return job, errors.Trace(err) } diff --git a/session/session.go b/session/session.go index 2e78d6167d73c..52d59514d0223 100644 --- a/session/session.go +++ b/session/session.go @@ -1714,7 +1714,7 @@ func (s *session) cachedPlanExec(ctx context.Context, s.txn.changeToInvalid() case *plannercore.Update: s.PrepareTSFuture(ctx) - stmtCtx.Priority = tikvstore.PriorityHigh + stmtCtx.Priority = kv.PriorityHigh resultSet, err = runStmt(ctx, s, stmt) case nil: // cache is invalid diff --git a/sessionctx/variable/session.go b/sessionctx/variable/session.go index 6e75bd7ca010c..536f446b16d68 100644 --- a/sessionctx/variable/session.go +++ b/sessionctx/variable/session.go @@ -962,7 +962,7 @@ func NewSessionVars() *SessionVars { OptimizerSelectivityLevel: DefTiDBOptimizerSelectivityLevel, RetryLimit: DefTiDBRetryLimit, DisableTxnAutoRetry: DefTiDBDisableTxnAutoRetry, - DDLReorgPriority: tikvstore.PriorityLow, + DDLReorgPriority: kv.PriorityLow, allowInSubqToJoinAndAgg: DefOptInSubqToJoinAndAgg, preferRangeScan: DefOptPreferRangeScan, CorrelationThreshold: DefOptCorrelationThreshold, @@ -1308,13 +1308,13 @@ func (s *SessionVars) setDDLReorgPriority(val string) { val = strings.ToLower(val) switch val { case "priority_low": - s.DDLReorgPriority = tikvstore.PriorityLow + s.DDLReorgPriority = kv.PriorityLow case "priority_normal": - s.DDLReorgPriority = tikvstore.PriorityNormal + s.DDLReorgPriority = kv.PriorityNormal case "priority_high": - s.DDLReorgPriority = tikvstore.PriorityHigh + s.DDLReorgPriority = kv.PriorityHigh default: - s.DDLReorgPriority = tikvstore.PriorityLow + s.DDLReorgPriority = kv.PriorityLow } } diff --git a/store/copr/batch_coprocessor.go b/store/copr/batch_coprocessor.go index af6fb47676d1a..cec3e49644363 100644 --- a/store/copr/batch_coprocessor.go +++ b/store/copr/batch_coprocessor.go @@ -346,7 +346,7 @@ func (b *batchCopIterator) handleTaskOnce(ctx context.Context, bo *tikv.Backoffe req := tikvrpc.NewRequest(task.cmdType, &copReq, kvrpcpb.Context{ IsolationLevel: isolationLevelToPB(b.req.IsolationLevel), - Priority: tikv.PriorityToPB(b.req.Priority), + Priority: priorityToPB(b.req.Priority), NotFillCache: b.req.NotFillCache, RecordTimeStat: true, RecordScanStat: true, diff --git a/store/copr/coprocessor.go b/store/copr/coprocessor.go index dbb9dc807d5e8..19a93c217d04c 100644 --- a/store/copr/coprocessor.go +++ b/store/copr/coprocessor.go @@ -698,7 +698,7 @@ func (worker *copIteratorWorker) handleTaskOnce(bo *tikv.Backoffer, task *copTas req := tikvrpc.NewReplicaReadRequest(task.cmdType, &copReq, worker.req.ReplicaRead, &worker.replicaReadSeed, kvrpcpb.Context{ IsolationLevel: isolationLevelToPB(worker.req.IsolationLevel), - Priority: tikv.PriorityToPB(worker.req.Priority), + Priority: priorityToPB(worker.req.Priority), NotFillCache: worker.req.NotFillCache, RecordTimeStat: true, RecordScanStat: true, @@ -1192,6 +1192,18 @@ func (e *rateLimitAction) isEnabled() bool { return atomic.LoadUint32(&e.enabled) > 0 } +// priorityToPB converts priority type to wire type. +func priorityToPB(pri int) kvrpcpb.CommandPri { + switch pri { + case kv.PriorityLow: + return kvrpcpb.CommandPri_Low + case kv.PriorityHigh: + return kvrpcpb.CommandPri_High + default: + return kvrpcpb.CommandPri_Normal + } +} + func isolationLevelToPB(level kv.IsoLevel) kvrpcpb.IsolationLevel { switch level { case kv.RC: diff --git a/store/driver/txn/snapshot.go b/store/driver/txn/snapshot.go index 5fa8f67f90905..28ecbd567e786 100644 --- a/store/driver/txn/snapshot.go +++ b/store/driver/txn/snapshot.go @@ -67,6 +67,8 @@ func (s *tikvSnapshot) SetOption(opt int, val interface{}) { case tikvstore.IsolationLevel: level := getTiKVIsolationLevel(val.(kv.IsoLevel)) s.KVSnapshot.SetIsolationLevel(level) + case tikvstore.Priority: + s.KVSnapshot.SetPriority(getTiKVPriority(val.(int))) default: s.KVSnapshot.SetOption(opt, val) } @@ -87,3 +89,14 @@ func getTiKVIsolationLevel(level kv.IsoLevel) tikv.IsoLevel { return tikv.SI } } + +func getTiKVPriority(pri int) tikv.Priority { + switch pri { + case kv.PriorityHigh: + return tikv.PriorityHigh + case kv.PriorityLow: + return tikv.PriorityLow + default: + return tikv.PriorityNormal + } +} diff --git a/store/driver/txn/txn_driver.go b/store/driver/txn/txn_driver.go index 9a5731c14973e..ddec266419fd9 100644 --- a/store/driver/txn/txn_driver.go +++ b/store/driver/txn/txn_driver.go @@ -134,6 +134,8 @@ func (txn *tikvTxn) SetOption(opt int, val interface{}) { case tikvstore.IsolationLevel: level := getTiKVIsolationLevel(val.(kv.IsoLevel)) txn.KVTxn.GetSnapshot().SetIsolationLevel(level) + case tikvstore.Priority: + txn.KVTxn.SetPriority(getTiKVPriority(val.(int))) case tikvstore.Pessimistic: txn.SetPessimistic(val.(bool)) default: diff --git a/store/tikv/2pc.go b/store/tikv/2pc.go index b01e18eebd5d1..e2172d17bd806 100644 --- a/store/tikv/2pc.go +++ b/store/tikv/2pc.go @@ -425,7 +425,7 @@ func (c *twoPhaseCommitter) initKeysAndMutations() error { metrics.TiKVTxnWriteSizeHistogram.Observe(float64(commitDetail.WriteSize)) c.hasNoNeedCommitKeys = checkCnt > 0 c.lockTTL = txnLockTTL(txn.startTime, size) - c.priority = getTxnPriority(txn) + c.priority = txn.priority.ToPB() c.syncLog = getTxnSyncLog(txn) c.setDetail(commitDetail) return nil @@ -1649,13 +1649,6 @@ func (batchExe *batchExecutor) process(batches []batchMutations) error { return err } -func getTxnPriority(txn *KVTxn) pb.CommandPri { - if pri := txn.us.GetOption(kv.Priority); pri != nil { - return PriorityToPB(pri.(int)) - } - return pb.CommandPri_Normal -} - func getTxnSyncLog(txn *KVTxn) bool { if syncOption := txn.us.GetOption(kv.SyncLog); syncOption != nil { return syncOption.(bool) @@ -1663,18 +1656,6 @@ func getTxnSyncLog(txn *KVTxn) bool { return false } -// PriorityToPB converts priority type to wire type. -func PriorityToPB(pri int) pb.CommandPri { - switch pri { - case kv.PriorityLow: - return pb.CommandPri_Low - case kv.PriorityHigh: - return pb.CommandPri_High - default: - return pb.CommandPri_Normal - } -} - func (c *twoPhaseCommitter) setDetail(d *util.CommitDetails) { atomic.StorePointer(&c.detail, unsafe.Pointer(d)) } diff --git a/store/tikv/kv/option.go b/store/tikv/kv/option.go index a87ddcb7214a4..bac9316d41773 100644 --- a/store/tikv/kv/option.go +++ b/store/tikv/kv/option.go @@ -62,6 +62,7 @@ const ( ) // Priority value for transaction priority. +// TODO: remove after BR update. const ( PriorityNormal = iota PriorityLow diff --git a/store/tikv/scan.go b/store/tikv/scan.go index d2d151f59627f..19a14b3f73819 100644 --- a/store/tikv/scan.go +++ b/store/tikv/scan.go @@ -189,7 +189,7 @@ func (s *Scanner) getData(bo *Backoffer) error { } sreq := &pb.ScanRequest{ Context: &pb.Context{ - Priority: s.snapshot.priority, + Priority: s.snapshot.priority.ToPB(), NotFillCache: s.snapshot.notFillCache, IsolationLevel: s.snapshot.isolationLevel.ToPB(), }, @@ -207,7 +207,7 @@ func (s *Scanner) getData(bo *Backoffer) error { } s.snapshot.mu.RLock() req := tikvrpc.NewReplicaReadRequest(tikvrpc.CmdScan, sreq, s.snapshot.mu.replicaRead, &s.snapshot.replicaReadSeed, pb.Context{ - Priority: s.snapshot.priority, + Priority: s.snapshot.priority.ToPB(), NotFillCache: s.snapshot.notFillCache, TaskId: s.snapshot.mu.taskID, }) diff --git a/store/tikv/snapshot.go b/store/tikv/snapshot.go index 1e54955f5fe4e..b7159c59931c5 100644 --- a/store/tikv/snapshot.go +++ b/store/tikv/snapshot.go @@ -45,6 +45,21 @@ const ( maxTimestamp = math.MaxUint64 ) +// Priority is the priority for tikv to execute a command. +type Priority kvrpcpb.CommandPri + +// Priority value for transaction priority. +const ( + PriorityNormal = Priority(kvrpcpb.CommandPri_Normal) + PriorityLow = Priority(kvrpcpb.CommandPri_Low) + PriorityHigh = Priority(kvrpcpb.CommandPri_High) +) + +// ToPB converts priority to wire type. +func (p Priority) ToPB() kvrpcpb.CommandPri { + return kvrpcpb.CommandPri(p) +} + // IsoLevel is the transaction's isolation level. type IsoLevel kvrpcpb.IsolationLevel @@ -65,7 +80,7 @@ type KVSnapshot struct { store *KVStore version uint64 isolationLevel IsoLevel - priority pb.CommandPri + priority Priority notFillCache bool syncLog bool keyOnly bool @@ -106,7 +121,7 @@ func newTiKVSnapshot(store *KVStore, ts uint64, replicaReadSeed uint32) *KVSnaps return &KVSnapshot{ store: store, version: ts, - priority: pb.CommandPri_Normal, + priority: PriorityNormal, vars: kv.DefaultVars, replicaReadSeed: replicaReadSeed, resolvedLocks: util.NewTSSet(5), @@ -293,7 +308,7 @@ func (s *KVSnapshot) batchGetSingleRegion(bo *Backoffer, batch batchKeys, collec Keys: pending, Version: s.version, }, s.mu.replicaRead, &s.replicaReadSeed, pb.Context{ - Priority: s.priority, + Priority: s.priority.ToPB(), NotFillCache: s.notFillCache, TaskId: s.mu.taskID, }) @@ -443,7 +458,7 @@ func (s *KVSnapshot) get(ctx context.Context, bo *Backoffer, k []byte) ([]byte, Key: k, Version: s.version, }, s.mu.replicaRead, &s.replicaReadSeed, pb.Context{ - Priority: s.priority, + Priority: s.priority.ToPB(), NotFillCache: s.notFillCache, TaskId: s.mu.taskID, }) @@ -551,8 +566,6 @@ func (s *KVSnapshot) IterReverse(k []byte) (unionstore.Iterator, error) { // value of this option. Only ReplicaRead is supported for snapshot func (s *KVSnapshot) SetOption(opt int, val interface{}) { switch opt { - case kv.Priority: - s.priority = PriorityToPB(val.(int)) case kv.NotFillCache: s.notFillCache = val.(bool) case kv.SyncLog: @@ -610,6 +623,11 @@ func (s *KVSnapshot) SetIsolationLevel(level IsoLevel) { s.isolationLevel = level } +// SetPriority sets the priority for tikv to execute commands. +func (s *KVSnapshot) SetPriority(pri Priority) { + s.priority = pri +} + // SnapCacheHitCount gets the snapshot cache hit count. Only for test. func (s *KVSnapshot) SnapCacheHitCount() int { return int(atomic.LoadInt64(&s.mu.hitCnt)) diff --git a/store/tikv/tests/store_test.go b/store/tikv/tests/store_test.go index d10b09defd2ad..659dc6ea8f226 100644 --- a/store/tikv/tests/store_test.go +++ b/store/tikv/tests/store_test.go @@ -22,7 +22,6 @@ import ( "github.com/pingcap/failpoint" pb "github.com/pingcap/kvproto/pkg/kvrpcpb" "github.com/pingcap/tidb/store/tikv" - "github.com/pingcap/tidb/store/tikv/kv" "github.com/pingcap/tidb/store/tikv/oracle" "github.com/pingcap/tidb/store/tikv/oracle/oracles" "github.com/pingcap/tidb/store/tikv/tikvrpc" @@ -124,7 +123,7 @@ func (s *testStoreSuite) TestRequestPriority(c *C) { txn, err := s.store.Begin() c.Assert(err, IsNil) client.priority = pb.CommandPri_High - txn.SetOption(kv.Priority, kv.PriorityHigh) + txn.SetPriority(tikv.PriorityHigh) err = txn.Set([]byte("key"), []byte("value")) c.Assert(err, IsNil) err = txn.Commit(context.Background()) @@ -134,20 +133,20 @@ func (s *testStoreSuite) TestRequestPriority(c *C) { txn, err = s.store.Begin() c.Assert(err, IsNil) client.priority = pb.CommandPri_Low - txn.SetOption(kv.Priority, kv.PriorityLow) + txn.SetPriority(tikv.PriorityLow) _, err = txn.Get(context.TODO(), []byte("key")) c.Assert(err, IsNil) // A counter example. client.priority = pb.CommandPri_Low - txn.SetOption(kv.Priority, kv.PriorityNormal) + txn.SetPriority(tikv.PriorityNormal) _, err = txn.Get(context.TODO(), []byte("key")) // err is translated to "try again later" by backoffer, so doesn't check error value here. c.Assert(err, NotNil) // Cover Seek request. client.priority = pb.CommandPri_High - txn.SetOption(kv.Priority, kv.PriorityHigh) + txn.SetPriority(tikv.PriorityHigh) iter, err := txn.Iter([]byte("key"), nil) c.Assert(err, IsNil) for iter.Valid() { diff --git a/store/tikv/txn.go b/store/tikv/txn.go index 67477c510cee4..1377d02d28581 100644 --- a/store/tikv/txn.go +++ b/store/tikv/txn.go @@ -76,6 +76,7 @@ type KVTxn struct { binlog BinlogExecutor schemaLeaseChecker SchemaLeaseChecker + priority Priority isPessimistic bool kvFilter KVFilter } @@ -211,6 +212,12 @@ func (txn *KVTxn) SetPessimistic(b bool) { txn.isPessimistic = b } +// SetPriority sets the priority for both write and read. +func (txn *KVTxn) SetPriority(pri Priority) { + txn.priority = pri + txn.GetSnapshot().SetPriority(pri) +} + // SetKVFilter sets the filter to ignore key-values in memory buffer. func (txn *KVTxn) SetKVFilter(filter KVFilter) { txn.kvFilter = filter