Skip to content

Commit

Permalink
Merge branch 'master' into options-commit-hook
Browse files Browse the repository at this point in the history
  • Loading branch information
disksing committed Apr 29, 2021
2 parents 01ec17b + 68a22ed commit 3dbd168
Show file tree
Hide file tree
Showing 28 changed files with 184 additions and 103 deletions.
2 changes: 1 addition & 1 deletion ddl/backfilling.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ func newBackfillWorker(sessCtx sessionctx.Context, worker *worker, id int, t tab
sessCtx: sessCtx,
taskCh: make(chan *reorgBackfillTask, 1),
resultCh: make(chan *backfillResult, 1),
priority: tikvstore.PriorityLow,
priority: kv.PriorityLow,
}
}

Expand Down
3 changes: 1 addition & 2 deletions ddl/reorg.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ import (
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/stmtctx"
"github.com/pingcap/tidb/statistics"
tikvstore "github.com/pingcap/tidb/store/tikv/kv"
"github.com/pingcap/tidb/store/tikv/oracle"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/table/tables"
Expand Down Expand Up @@ -425,7 +424,7 @@ func (dc *ddlCtx) buildDescTableScan(ctx context.Context, startTS uint64, tbl ta
SetConcurrency(1).SetDesc(true)

builder.Request.NotFillCache = true
builder.Request.Priority = tikvstore.PriorityLow
builder.Request.Priority = kv.PriorityLow

kvReq, err := builder.Build()
if err != nil {
Expand Down
11 changes: 5 additions & 6 deletions distsql/request_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
"github.com/pingcap/tidb/sessionctx/stmtctx"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/statistics"
tikvstore "github.com/pingcap/tidb/store/tikv/kv"
"github.com/pingcap/tidb/store/tikv/oracle"
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/types"
Expand Down Expand Up @@ -146,7 +145,7 @@ func (builder *RequestBuilder) SetAnalyzeRequest(ana *tipb.AnalyzeReq) *RequestB
builder.Request.Data, builder.err = ana.Marshal()
builder.Request.NotFillCache = true
builder.Request.IsolationLevel = kv.RC
builder.Request.Priority = tikvstore.PriorityLow
builder.Request.Priority = kv.PriorityLow
}

return builder
Expand Down Expand Up @@ -210,13 +209,13 @@ func (builder *RequestBuilder) getIsolationLevel() kv.IsoLevel {
func (builder *RequestBuilder) getKVPriority(sv *variable.SessionVars) int {
switch sv.StmtCtx.Priority {
case mysql.NoPriority, mysql.DelayedPriority:
return tikvstore.PriorityNormal
return kv.PriorityNormal
case mysql.LowPriority:
return tikvstore.PriorityLow
return kv.PriorityLow
case mysql.HighPriority:
return tikvstore.PriorityHigh
return kv.PriorityHigh
}
return tikvstore.PriorityNormal
return kv.PriorityNormal
}

// SetFromSessionVars sets the following fields for "kv.Request" from session variables:
Expand Down
10 changes: 5 additions & 5 deletions executor/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ func (a *ExecStmt) PointGet(ctx context.Context, is infoschema.InfoSchema) (*rec
if err != nil {
return nil, err
}
a.Ctx.GetSessionVars().StmtCtx.Priority = tikvstore.PriorityHigh
a.Ctx.GetSessionVars().StmtCtx.Priority = kv.PriorityHigh

// try to reuse point get executor
if a.PsStmt.Executor != nil {
Expand Down Expand Up @@ -730,15 +730,15 @@ func (a *ExecStmt) buildExecutor() (Executor, error) {
if stmtPri := stmtCtx.Priority; stmtPri == mysql.NoPriority {
switch {
case useMaxTS:
stmtCtx.Priority = tikvstore.PriorityHigh
stmtCtx.Priority = kv.PriorityHigh
case a.LowerPriority:
stmtCtx.Priority = tikvstore.PriorityLow
stmtCtx.Priority = kv.PriorityLow
}
}
}
}
if _, ok := a.Plan.(*plannercore.Analyze); ok && ctx.GetSessionVars().InRestrictedSQL {
ctx.GetSessionVars().StmtCtx.Priority = tikvstore.PriorityLow
ctx.GetSessionVars().StmtCtx.Priority = kv.PriorityLow
}

b := newExecutorBuilder(ctx, a.InfoSchema)
Expand All @@ -758,7 +758,7 @@ func (a *ExecStmt) buildExecutor() (Executor, error) {
a.isPreparedStmt = true
a.Plan = executorExec.plan
if executorExec.lowerPriority {
ctx.GetSessionVars().StmtCtx.Priority = tikvstore.PriorityLow
ctx.GetSessionVars().StmtCtx.Priority = kv.PriorityLow
}
e = executorExec.stmtExec
}
Expand Down
4 changes: 2 additions & 2 deletions executor/analyze.go
Original file line number Diff line number Diff line change
Expand Up @@ -964,7 +964,7 @@ func (e *AnalyzeFastExec) activateTxnForRowCount() (rollbackFn func() error, err
return nil, errors.Trace(err)
}
}
txn.SetOption(tikvstore.Priority, tikvstore.PriorityLow)
txn.SetOption(tikvstore.Priority, kv.PriorityLow)
txn.SetOption(tikvstore.IsolationLevel, kv.RC)
txn.SetOption(tikvstore.NotFillCache, true)
return rollbackFn, nil
Expand Down Expand Up @@ -1186,7 +1186,7 @@ func (e *AnalyzeFastExec) handleSampTasks(workID int, step uint32, err *error) {
snapshot := e.ctx.GetStore().GetSnapshot(kv.MaxVersion)
snapshot.SetOption(tikvstore.NotFillCache, true)
snapshot.SetOption(tikvstore.IsolationLevel, kv.RC)
snapshot.SetOption(tikvstore.Priority, tikvstore.PriorityLow)
snapshot.SetOption(tikvstore.Priority, kv.PriorityLow)
if e.ctx.GetSessionVars().GetReplicaRead().IsFollowerRead() {
snapshot.SetOption(tikvstore.ReplicaRead, tikvstore.ReplicaReadFollower)
}
Expand Down
7 changes: 7 additions & 0 deletions kv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -439,6 +439,13 @@ type SplittableStore interface {
CheckRegionInScattering(regionID uint64) (bool, error)
}

// Priority value for transaction priority.
const (
PriorityNormal = iota
PriorityLow
PriorityHigh
)

// IsoLevel is the transaction's isolation level.
type IsoLevel int

Expand Down
2 changes: 1 addition & 1 deletion kv/mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func (s testMockSuite) TestInterface(c *C) {
snapshot := storage.GetSnapshot(version)
_, err = snapshot.BatchGet(context.Background(), []Key{Key("abc"), Key("def")})
c.Check(err, IsNil)
snapshot.SetOption(tikvstore.Priority, tikvstore.PriorityNormal)
snapshot.SetOption(tikvstore.Priority, PriorityNormal)

transaction, err := storage.Begin()
c.Check(err, IsNil)
Expand Down
12 changes: 6 additions & 6 deletions meta/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,8 @@ type Meta struct {
// NewMeta creates a Meta in transaction txn.
// If the current Meta needs to handle a job, jobListKey is the type of the job's list.
func NewMeta(txn kv.Transaction, jobListKeys ...JobListKeyType) *Meta {
txn.SetOption(tikvstore.Priority, tikvstore.PriorityHigh)
txn.SetOption(tikvstore.SyncLog, true)
txn.SetOption(tikvstore.Priority, kv.PriorityHigh)
txn.SetOption(tikvstore.SyncLog, struct{}{})
t := structure.NewStructure(txn, txn, mMetaPrefix)
listKey := DefaultJobListKey
if len(jobListKeys) != 0 {
Expand Down Expand Up @@ -636,13 +636,13 @@ func (m *Meta) getDDLJob(key []byte, index int64) (*model.Job, error) {

job := &model.Job{
// For compatibility, if the job is enqueued by old version TiDB and Priority field is omitted,
// set the default priority to tikvstore.PriorityLow.
Priority: tikvstore.PriorityLow,
// set the default priority to kv.PriorityLow.
Priority: kv.PriorityLow,
}
err = job.Decode(value)
// Check if the job.Priority is valid.
if job.Priority < tikvstore.PriorityNormal || job.Priority > tikvstore.PriorityHigh {
job.Priority = tikvstore.PriorityLow
if job.Priority < kv.PriorityNormal || job.Priority > kv.PriorityHigh {
job.Priority = kv.PriorityLow
}
return job, errors.Trace(err)
}
Expand Down
31 changes: 31 additions & 0 deletions planner/core/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3524,3 +3524,34 @@ func (s *testIntegrationSuite) TestIssue24095(c *C) {
tk.MustQuery("explain format = 'brief' " + tt).Check(testkit.Rows(output[i].Plan...))
}
}

func (s *testIntegrationSuite) TestConflictReadFromStorage(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
tk.MustExec("drop table if exists t")
tk.MustExec(`create table t (
a int, b int, c varchar(20),
primary key(a), key(b), key(c)
) partition by range columns(a) (
partition p0 values less than(6),
partition p1 values less than(11),
partition p2 values less than(16));`)
tk.MustExec(`insert into t values (1,1,"1"), (2,2,"2"), (8,8,"8"), (11,11,"11"), (15,15,"15")`)
// Create virtual tiflash replica info.
dom := domain.GetDomain(tk.Se)
is := dom.InfoSchema()
db, exists := is.SchemaByName(model.NewCIStr("test"))
c.Assert(exists, IsTrue)
for _, tblInfo := range db.Tables {
if tblInfo.Name.L == "t" {
tblInfo.TiFlashReplica = &model.TiFlashReplicaInfo{
Count: 1,
Available: true,
}
}
}
tk.MustQuery(`explain select /*+ read_from_storage(tikv[t partition(p0)], tiflash[t partition(p1, p2)]) */ * from t`)
tk.MustQuery("show warnings").Check(testkit.Rows("Warning 1815 Storage hints are conflict, you can only specify one storage type of table test.t"))
tk.MustQuery(`explain select /*+ read_from_storage(tikv[t], tiflash[t]) */ * from t`)
tk.MustQuery("show warnings").Check(testkit.Rows("Warning 1815 Storage hints are conflict, you can only specify one storage type of table test.t"))
}
7 changes: 3 additions & 4 deletions planner/core/logical_plan_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -610,10 +610,9 @@ func (ds *DataSource) setPreferredStoreType(hintInfo *tableHintInfo) {
}
}
if hintTbl := hintInfo.ifPreferTiFlash(alias); hintTbl != nil {
// 1. `ds.tableInfo.Partition == nil`, which means the hint takes effect in the whole table.
// 2. `ds.preferStoreType != 0`, which means there's a hint hit the both TiKV value and TiFlash value for table.
// If it's satisfied the above two conditions, then we can make sure there are some hints conflicted.
if ds.preferStoreType != 0 && ds.tableInfo.Partition == nil {
// `ds.preferStoreType != 0`, which means there's a hint hit the both TiKV value and TiFlash value for table.
// We can't support read a table from two different storages, even partition table.
if ds.preferStoreType != 0 {
errMsg := fmt.Sprintf("Storage hints are conflict, you can only specify one storage type of table %s.%s",
alias.dbName.L, alias.tblName.L)
warning := ErrInternal.GenWithStack(errMsg)
Expand Down
3 changes: 1 addition & 2 deletions planner/core/testdata/integration_suite_in.json
Original file line number Diff line number Diff line change
Expand Up @@ -168,8 +168,7 @@
"select /*+ hash_join(t1, t2 partition(p0)) */ * from t t1 join t t2 on t1.a = t2.a",
"select /*+ use_index_merge(t partition(p0)) */ * from t where t.b = 1 or t.c = \"8\"",
"select /*+ use_index_merge(t partition(p0, p1) primary, b) */ * from t where t.a = 1 or t.b = 2",
"select /*+ use_index(t partition(p0) b) */ * from t partition(p0, p1)",
"select /*+ read_from_storage(tikv[t partition(p0)], tiflash[t partition(p1, p2)]) */ * from t"
"select /*+ use_index(t partition(p0) b) */ * from t partition(p0, p1)"
]
},
{
Expand Down
13 changes: 0 additions & 13 deletions planner/core/testdata/integration_suite_out.json
Original file line number Diff line number Diff line change
Expand Up @@ -879,19 +879,6 @@
" └─TableFullScan 10000.00 cop[tiflash] table:t, partition:p1 keep order:false, stats:pseudo"
],
"Warn": null
},
{
"SQL": "select /*+ read_from_storage(tikv[t partition(p0)], tiflash[t partition(p1, p2)]) */ * from t",
"Plan": [
"PartitionUnion 30000.00 root ",
"├─TableReader 10000.00 root data:TableFullScan",
"│ └─TableFullScan 10000.00 cop[tikv] table:t, partition:p0 keep order:false, stats:pseudo",
"├─TableReader 10000.00 root data:TableFullScan",
"│ └─TableFullScan 10000.00 cop[tiflash] table:t, partition:p1 keep order:false, stats:pseudo",
"└─TableReader 10000.00 root data:TableFullScan",
" └─TableFullScan 10000.00 cop[tiflash] table:t, partition:p2 keep order:false, stats:pseudo"
],
"Warn": null
}
]
},
Expand Down
2 changes: 1 addition & 1 deletion session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -1714,7 +1714,7 @@ func (s *session) cachedPlanExec(ctx context.Context,
s.txn.changeToInvalid()
case *plannercore.Update:
s.PrepareTSFuture(ctx)
stmtCtx.Priority = tikvstore.PriorityHigh
stmtCtx.Priority = kv.PriorityHigh
resultSet, err = runStmt(ctx, s, stmt)
case nil:
// cache is invalid
Expand Down
10 changes: 5 additions & 5 deletions sessionctx/variable/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -962,7 +962,7 @@ func NewSessionVars() *SessionVars {
OptimizerSelectivityLevel: DefTiDBOptimizerSelectivityLevel,
RetryLimit: DefTiDBRetryLimit,
DisableTxnAutoRetry: DefTiDBDisableTxnAutoRetry,
DDLReorgPriority: tikvstore.PriorityLow,
DDLReorgPriority: kv.PriorityLow,
allowInSubqToJoinAndAgg: DefOptInSubqToJoinAndAgg,
preferRangeScan: DefOptPreferRangeScan,
CorrelationThreshold: DefOptCorrelationThreshold,
Expand Down Expand Up @@ -1308,13 +1308,13 @@ func (s *SessionVars) setDDLReorgPriority(val string) {
val = strings.ToLower(val)
switch val {
case "priority_low":
s.DDLReorgPriority = tikvstore.PriorityLow
s.DDLReorgPriority = kv.PriorityLow
case "priority_normal":
s.DDLReorgPriority = tikvstore.PriorityNormal
s.DDLReorgPriority = kv.PriorityNormal
case "priority_high":
s.DDLReorgPriority = tikvstore.PriorityHigh
s.DDLReorgPriority = kv.PriorityHigh
default:
s.DDLReorgPriority = tikvstore.PriorityLow
s.DDLReorgPriority = kv.PriorityLow
}
}

Expand Down
2 changes: 1 addition & 1 deletion store/copr/batch_coprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,7 @@ func (b *batchCopIterator) handleTaskOnce(ctx context.Context, bo *tikv.Backoffe

req := tikvrpc.NewRequest(task.cmdType, &copReq, kvrpcpb.Context{
IsolationLevel: isolationLevelToPB(b.req.IsolationLevel),
Priority: tikv.PriorityToPB(b.req.Priority),
Priority: priorityToPB(b.req.Priority),
NotFillCache: b.req.NotFillCache,
RecordTimeStat: true,
RecordScanStat: true,
Expand Down
14 changes: 13 additions & 1 deletion store/copr/coprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -698,7 +698,7 @@ func (worker *copIteratorWorker) handleTaskOnce(bo *tikv.Backoffer, task *copTas

req := tikvrpc.NewReplicaReadRequest(task.cmdType, &copReq, worker.req.ReplicaRead, &worker.replicaReadSeed, kvrpcpb.Context{
IsolationLevel: isolationLevelToPB(worker.req.IsolationLevel),
Priority: tikv.PriorityToPB(worker.req.Priority),
Priority: priorityToPB(worker.req.Priority),
NotFillCache: worker.req.NotFillCache,
RecordTimeStat: true,
RecordScanStat: true,
Expand Down Expand Up @@ -1192,6 +1192,18 @@ func (e *rateLimitAction) isEnabled() bool {
return atomic.LoadUint32(&e.enabled) > 0
}

// priorityToPB converts priority type to wire type.
func priorityToPB(pri int) kvrpcpb.CommandPri {
switch pri {
case kv.PriorityLow:
return kvrpcpb.CommandPri_Low
case kv.PriorityHigh:
return kvrpcpb.CommandPri_High
default:
return kvrpcpb.CommandPri_Normal
}
}

func isolationLevelToPB(level kv.IsoLevel) kvrpcpb.IsolationLevel {
switch level {
case kv.RC:
Expand Down
5 changes: 5 additions & 0 deletions store/driver/txn/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,10 @@ func toTiDBErr(err error) error {
return kv.ErrNotExist
}

if e, ok := err.(*tikverr.ErrWriteConflictInLatch); ok {
return kv.ErrWriteConflictInTiDB.FastGenByArgs(e.StartTS)
}

if e, ok := err.(*tikverr.ErrTxnTooLarge); ok {
return kv.ErrTxnTooLarge.GenWithStackByArgs(e.Size)
}
Expand All @@ -171,6 +175,7 @@ func toTiDBErr(err error) error {
if errors.ErrorEqual(err, tikverr.ErrInvalidTxn) {
return kv.ErrInvalidTxn
}

return errors.Trace(err)
}

Expand Down
17 changes: 17 additions & 0 deletions store/driver/txn/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,12 @@ func (s *tikvSnapshot) SetOption(opt int, val interface{}) {
case tikvstore.IsolationLevel:
level := getTiKVIsolationLevel(val.(kv.IsoLevel))
s.KVSnapshot.SetIsolationLevel(level)
case tikvstore.Priority:
s.KVSnapshot.SetPriority(getTiKVPriority(val.(int)))
case tikvstore.NotFillCache:
s.KVSnapshot.SetNotFillCache(val.(bool))
case tikvstore.SnapshotTS:
s.KVSnapshot.SetSnapshotTS(val.(uint64))
default:
s.KVSnapshot.SetOption(opt, val)
}
Expand All @@ -87,3 +93,14 @@ func getTiKVIsolationLevel(level kv.IsoLevel) tikv.IsoLevel {
return tikv.SI
}
}

func getTiKVPriority(pri int) tikv.Priority {
switch pri {
case kv.PriorityHigh:
return tikv.PriorityHigh
case kv.PriorityLow:
return tikv.PriorityLow
default:
return tikv.PriorityNormal
}
}
8 changes: 8 additions & 0 deletions store/driver/txn/txn_driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,8 +134,16 @@ func (txn *tikvTxn) SetOption(opt int, val interface{}) {
case tikvstore.IsolationLevel:
level := getTiKVIsolationLevel(val.(kv.IsoLevel))
txn.KVTxn.GetSnapshot().SetIsolationLevel(level)
case tikvstore.Priority:
txn.KVTxn.SetPriority(getTiKVPriority(val.(int)))
case tikvstore.NotFillCache:
txn.KVTxn.GetSnapshot().SetNotFillCache(val.(bool))
case tikvstore.SyncLog:
txn.EnableForceSyncLog()
case tikvstore.Pessimistic:
txn.SetPessimistic(val.(bool))
case tikvstore.SnapshotTS:
txn.KVTxn.GetSnapshot().SetSnapshotTS(val.(uint64))
case tikvstore.CommitHook:
txn.SetCommitCallback(val.(func(string, error)))
default:
Expand Down
Loading

0 comments on commit 3dbd168

Please sign in to comment.