Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

topsql: distinguish the row and index operation type #29044

Merged
merged 97 commits into from
Nov 24, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
97 commits
Select commit Hold shift + click to select a range
1a8573d
init
crazycs520 Aug 12, 2021
f59d00b
init
crazycs520 Aug 13, 2021
8c049fe
Merge branch 'master' into topsql-scan-row
mornyx Sep 1, 2021
e832bfd
Merge branch 'master' into topsql-scan-row
mornyx Sep 2, 2021
5478ab9
Merge branch 'master' into topsql-scan-row
mornyx Sep 27, 2021
3cee76c
Merge branch 'master' into resource-metering
mornyx Oct 13, 2021
7ae5bbd
Extend label of ResourceGroupTag
mornyx Oct 24, 2021
d4b5fe6
Merge branch 'master' into resource-metering
mornyx Oct 24, 2021
e8dd9d3
Fix unit tests
mornyx Oct 24, 2021
b6866b7
Merge branch 'master' into resource-metering
mornyx Oct 27, 2021
9d750ad
Remove mornyx/tipb
mornyx Oct 28, 2021
26a03b1
Merge branch 'master' into resource-metering
mornyx Oct 28, 2021
e2b0099
Run go mod tidy
mornyx Oct 28, 2021
dccdef5
Merge branch 'master' into resource-metering
mornyx Oct 28, 2021
1e66caa
Merge branch 'master' into resource-metering
mornyx Oct 28, 2021
9267abb
Add test cases for resource metering tag label
mornyx Oct 28, 2021
99380ba
Merge branch 'master' into resource-metering
mornyx Oct 28, 2021
35942bc
Fix tests
mornyx Oct 28, 2021
1b04965
Fix tests
mornyx Oct 28, 2021
af63673
Fix tests
mornyx Oct 28, 2021
e67f408
Distinguish row/index based on first key
mornyx Nov 3, 2021
941a53b
Merge branch 'master' into resource-metering
mornyx Nov 3, 2021
1777d41
Repair circular dependencies; Add unit-tests
mornyx Nov 3, 2021
1259c8b
Upgrade client-go
mornyx Nov 3, 2021
d90985d
tidy
mornyx Nov 3, 2021
2dd79e8
Merge branch 'master' into resource-metering
mornyx Nov 3, 2021
a6caf5b
Complete executor_test
mornyx Nov 3, 2021
7537bca
Use ResourceGroupTagFactory
mornyx Nov 3, 2021
40a09dc
Merge branch 'master' into resource-metering
mornyx Nov 3, 2021
ad1d725
Fix type conversion
mornyx Nov 3, 2021
a703a8a
Merge branch 'master' into resource-metering
mornyx Nov 4, 2021
4781623
Upgrade to ResourceGroupTagger
mornyx Nov 9, 2021
02f535e
Merge remote-tracking branch 'mornyx/resource-metering' into resource…
mornyx Nov 9, 2021
9ce5bca
Merge branch 'master' into resource-metering
mornyx Nov 9, 2021
0b7be5d
Update kv/option.go
mornyx Nov 9, 2021
e747587
Fix go.sum
mornyx Nov 10, 2021
eea3cda
Merge branch 'master' into resource-metering
mornyx Nov 10, 2021
5c960c3
Merge branch 'master' into resource-metering
mornyx Nov 10, 2021
eeaeb16
Merge branch 'master' into resource-metering
mornyx Nov 11, 2021
155348e
Merge branch 'master' into resource-metering
mornyx Nov 15, 2021
0fcd445
Upgrade client-go
mornyx Nov 15, 2021
e37e8a2
Fix resource group tag test assertion
mornyx Nov 15, 2021
70634be
Merge branch 'master' into resource-metering
mornyx Nov 15, 2021
f5d269e
Merge branch 'master' into resource-metering
mornyx Nov 15, 2021
ac6afb5
Merge branch 'master' into resource-metering
mornyx Nov 15, 2021
41f978a
Optimize resource group tagger for DAG request
mornyx Nov 15, 2021
46c6626
Remove print in test
mornyx Nov 15, 2021
50e829c
Merge branch 'master' into resource-metering
mornyx Nov 15, 2021
74c2b82
Merge branch 'master' into resource-metering
mornyx Nov 16, 2021
5489fa2
Update comment for SetFromSessionVars
mornyx Nov 16, 2021
06395ce
optimize
mornyx Nov 16, 2021
3251c73
Merge branch 'master' into resource-metering
mornyx Nov 16, 2021
2c41863
Add rowindexcodec
mornyx Nov 17, 2021
a2d214b
Add comments
mornyx Nov 17, 2021
7b70cab
Merge branch 'master' into resource-metering
mornyx Nov 17, 2021
cc352e1
Add license comment; Add main_test
mornyx Nov 17, 2021
3b6703e
Merge branch 'master' into resource-metering
mornyx Nov 17, 2021
7b026b2
Update executor/executor.go
mornyx Nov 18, 2021
d59055e
Rename setResourceGroupTagForTxn
mornyx Nov 18, 2021
23d5eaf
Merge branch 'master' into resource-metering
crazycs520 Nov 18, 2021
fd2ba1f
Set ResourceGroupTagger for LockCtx
mornyx Nov 18, 2021
cc25883
Fix executor test
mornyx Nov 19, 2021
df05935
Merge branch 'master' into resource-metering
mornyx Nov 19, 2021
c118a2e
Merge remote-tracking branch 'mornyx/resource-metering' into resource…
mornyx Nov 19, 2021
f341a3e
Fix executor test
mornyx Nov 19, 2021
74d4da8
Merge branch 'master' into resource-metering
mornyx Nov 19, 2021
3370349
Fix tests
mornyx Nov 19, 2021
e757948
Merge branch 'master' into resource-metering
mornyx Nov 19, 2021
e0e2d86
Merge branch 'master' into resource-metering
mornyx Nov 19, 2021
23f6e50
Merge branch 'master' into resource-metering
mornyx Nov 22, 2021
88e4bfe
Merge branch 'master' into resource-metering
breezewish Nov 22, 2021
d689ee1
Generate digest explicitly
mornyx Nov 22, 2021
6132d4b
Merge branch 'master' into resource-metering
mornyx Nov 22, 2021
d90c86f
Remove unnecessary judgment whether TopSQL is enabled
mornyx Nov 22, 2021
ae33f1a
Merge branch 'master' into resource-metering
mornyx Nov 22, 2021
09d5b98
Merge branch 'master' into resource-metering
mornyx Nov 22, 2021
77c6ced
Merge branch 'master' into resource-metering
mornyx Nov 22, 2021
9c32941
Merge branch 'master' into resource-metering
mornyx Nov 22, 2021
804ab5e
Merge branch 'master' into resource-metering
mornyx Nov 22, 2021
ff2fbd8
Merge branch 'master' into resource-metering
mornyx Nov 22, 2021
f63e2d9
Merge branch 'master' into resource-metering
mornyx Nov 22, 2021
9bea4a7
Fix plan digest
mornyx Nov 23, 2021
0a9516f
Merge branch 'master' into resource-metering
mornyx Nov 23, 2021
1c0bcc6
Fix
mornyx Nov 23, 2021
8443600
Merge branch 'master' into resource-metering
mornyx Nov 23, 2021
33780a2
Merge branch 'master' into resource-metering
mornyx Nov 23, 2021
abc4d09
Merge branch 'master' into resource-metering
mornyx Nov 23, 2021
754380d
Merge branch 'master' into resource-metering
mornyx Nov 23, 2021
82b0156
Merge branch 'master' into resource-metering
mornyx Nov 23, 2021
2113719
Merge branch 'master' into resource-metering
mornyx Nov 23, 2021
ce014d1
Merge branch 'master' into resource-metering
mornyx Nov 23, 2021
79cee96
Merge branch 'master' into resource-metering
mornyx Nov 23, 2021
e0668d8
Merge branch 'master' into resource-metering
mornyx Nov 23, 2021
110a222
Merge branch 'master' into resource-metering
mornyx Nov 23, 2021
37a684d
Merge branch 'master' into resource-metering
mornyx Nov 23, 2021
a0be0df
Merge branch 'master' into resource-metering
ti-chi-bot Nov 23, 2021
2a74c95
Merge branch 'master' into resource-metering
ti-chi-bot Nov 24, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions distsql/request_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ func (builder *RequestBuilder) getKVPriority(sv *variable.SessionVars) int {
}

// SetFromSessionVars sets the following fields for "kv.Request" from session variables:
// "Concurrency", "IsolationLevel", "NotFillCache", "ReplicaRead", "SchemaVar".
// "Concurrency", "IsolationLevel", "NotFillCache", "TaskID", "Priority", "ReplicaRead", "ResourceGroupTagger".
func (builder *RequestBuilder) SetFromSessionVars(sv *variable.SessionVars) *RequestBuilder {
if builder.Request.Concurrency == 0 {
// Concurrency may be set to 1 by SetDAGRequest
Expand All @@ -246,7 +246,7 @@ func (builder *RequestBuilder) SetFromSessionVars(sv *variable.SessionVars) *Req
builder.Request.TaskID = sv.StmtCtx.TaskID
builder.Request.Priority = builder.getKVPriority(sv)
builder.Request.ReplicaRead = sv.GetReplicaRead()
builder.SetResourceGroupTag(sv.StmtCtx)
builder.SetResourceGroupTagger(sv.StmtCtx)
mornyx marked this conversation as resolved.
Show resolved Hide resolved
return builder
}

Expand Down Expand Up @@ -282,10 +282,10 @@ func (builder *RequestBuilder) SetFromInfoSchema(pis interface{}) *RequestBuilde
return builder
}

// SetResourceGroupTag sets the request resource group tag.
func (builder *RequestBuilder) SetResourceGroupTag(sc *stmtctx.StatementContext) *RequestBuilder {
// SetResourceGroupTagger sets the request resource group tagger.
func (builder *RequestBuilder) SetResourceGroupTagger(sc *stmtctx.StatementContext) *RequestBuilder {
if variable.TopSQLEnabled() {
builder.Request.ResourceGroupTag = sc.GetResourceGroupTag()
builder.Request.ResourceGroupTagger = sc.GetResourceGroupTagger()
}
return builder
}
Expand Down
8 changes: 4 additions & 4 deletions executor/analyze.go
Original file line number Diff line number Diff line change
Expand Up @@ -389,7 +389,7 @@ func (e *AnalyzeIndexExec) fetchAnalyzeResult(ranges []*ranger.Range, isNullRang
} else {
kvReqBuilder = builder.SetIndexRangesForTables(e.ctx.GetSessionVars().StmtCtx, []int64{e.tableID.GetStatisticsID()}, e.idxInfo.ID, ranges)
}
kvReqBuilder.SetResourceGroupTag(e.ctx.GetSessionVars().StmtCtx)
kvReqBuilder.SetResourceGroupTagger(e.ctx.GetSessionVars().StmtCtx)
kvReq, err := kvReqBuilder.
SetAnalyzeRequest(e.analyzePB).
SetStartTS(e.snapshot).
Expand Down Expand Up @@ -750,7 +750,7 @@ func (e *AnalyzeColumnsExec) open(ranges []*ranger.Range) error {
func (e *AnalyzeColumnsExec) buildResp(ranges []*ranger.Range) (distsql.SelectResult, error) {
var builder distsql.RequestBuilder
reqBuilder := builder.SetHandleRangesForTables(e.ctx.GetSessionVars().StmtCtx, []int64{e.TableID.GetStatisticsID()}, e.handleCols != nil && !e.handleCols.IsInt(), ranges, nil)
builder.SetResourceGroupTag(e.ctx.GetSessionVars().StmtCtx)
builder.SetResourceGroupTagger(e.ctx.GetSessionVars().StmtCtx)
// Always set KeepOrder of the request to be true, in order to compute
// correct `correlation` of columns.
kvReq, err := reqBuilder.
Expand Down Expand Up @@ -1853,7 +1853,7 @@ func (e *AnalyzeFastExec) handleScanTasks(bo *tikv.Backoffer) (keysSize int, err
if e.ctx.GetSessionVars().GetReplicaRead().IsFollowerRead() {
snapshot.SetOption(kv.ReplicaRead, kv.ReplicaReadFollower)
}
setResourceGroupTagForTxn(e.ctx.GetSessionVars().StmtCtx, snapshot)
setResourceGroupTaggerForTxn(e.ctx.GetSessionVars().StmtCtx, snapshot)
for _, t := range e.scanTasks {
iter, err := snapshot.Iter(kv.Key(t.StartKey), kv.Key(t.EndKey))
if err != nil {
Expand All @@ -1874,7 +1874,7 @@ func (e *AnalyzeFastExec) handleSampTasks(workID int, step uint32, err *error) {
snapshot.SetOption(kv.NotFillCache, true)
snapshot.SetOption(kv.IsolationLevel, kv.SI)
snapshot.SetOption(kv.Priority, kv.PriorityLow)
setResourceGroupTagForTxn(e.ctx.GetSessionVars().StmtCtx, snapshot)
setResourceGroupTaggerForTxn(e.ctx.GetSessionVars().StmtCtx, snapshot)
readReplicaType := e.ctx.GetSessionVars().GetReplicaRead()
if readReplicaType.IsFollowerRead() {
snapshot.SetOption(kv.ReplicaRead, readReplicaType)
Expand Down
2 changes: 1 addition & 1 deletion executor/batch_point_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ func (e *BatchPointGetExec) Open(context.Context) error {
},
})
}
setResourceGroupTagForTxn(stmtCtx, snapshot)
setResourceGroupTaggerForTxn(stmtCtx, snapshot)
var batchGetter kv.BatchGetter = snapshot
if txn.Valid() {
lock := e.tblInfo.Lock
Expand Down
4 changes: 2 additions & 2 deletions executor/checksum.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ func (c *checksumContext) buildTableRequest(ctx sessionctx.Context, tableID int6
}

var builder distsql.RequestBuilder
builder.SetResourceGroupTag(ctx.GetSessionVars().StmtCtx)
builder.SetResourceGroupTagger(ctx.GetSessionVars().StmtCtx)
return builder.SetHandleRanges(ctx.GetSessionVars().StmtCtx, tableID, c.TableInfo.IsCommonHandle, ranges, nil).
SetChecksumRequest(checksum).
SetStartTS(c.StartTs).
Expand All @@ -258,7 +258,7 @@ func (c *checksumContext) buildIndexRequest(ctx sessionctx.Context, tableID int6
ranges := ranger.FullRange()

var builder distsql.RequestBuilder
builder.SetResourceGroupTag(ctx.GetSessionVars().StmtCtx)
builder.SetResourceGroupTagger(ctx.GetSessionVars().StmtCtx)
return builder.SetIndexRanges(ctx.GetSessionVars().StmtCtx, tableID, indexInfo.ID, ranges).
SetChecksumRequest(checksum).
SetStartTS(c.StartTs).
Expand Down
25 changes: 16 additions & 9 deletions executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/opentracing/opentracing-go"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/domain/infosync"
Expand All @@ -39,7 +40,6 @@ import (
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/meta"
"github.com/pingcap/tidb/meta/autoid"
"github.com/pingcap/tidb/parser"
"github.com/pingcap/tidb/parser/ast"
"github.com/pingcap/tidb/parser/auth"
"github.com/pingcap/tidb/parser/model"
Expand Down Expand Up @@ -968,18 +968,25 @@ func (e *SelectLockExec) Next(ctx context.Context, req *chunk.Chunk) error {
}

func newLockCtx(seVars *variable.SessionVars, lockWaitTime int64) *tikvstore.LockCtx {
var planDigest *parser.Digest
_, sqlDigest := seVars.StmtCtx.SQLDigest()
if variable.TopSQLEnabled() {
_, planDigest = seVars.StmtCtx.GetPlanDigest()
}
lockCtx := tikvstore.NewLockCtx(seVars.TxnCtx.GetForUpdateTS(), lockWaitTime, seVars.StmtCtx.GetLockWaitStartTime())
lockCtx.Killed = &seVars.Killed
lockCtx.PessimisticLockWaited = &seVars.StmtCtx.PessimisticLockWaited
lockCtx.LockKeysDuration = &seVars.StmtCtx.LockKeysDuration
lockCtx.LockKeysCount = &seVars.StmtCtx.LockKeysCount
lockCtx.LockExpired = &seVars.TxnCtx.LockExpire
lockCtx.ResourceGroupTag = resourcegrouptag.EncodeResourceGroupTag(sqlDigest, planDigest)
lockCtx.ResourceGroupTagger = func(req *kvrpcpb.PessimisticLockRequest) []byte {
if req == nil {
return nil
}
if len(req.Mutations) == 0 {
return nil
}
if mutation := req.Mutations[0]; mutation != nil {
label := resourcegrouptag.GetResourceGroupLabelByKey(mutation.Key)
return seVars.StmtCtx.GetResourceGroupTagByLabel(label)
}
return nil
}
lockCtx.OnDeadlock = func(deadlock *tikverr.ErrDeadlock) {
cfg := config.GetGlobalConfig()
if deadlock.IsRetryable && !cfg.PessimisticTxn.DeadlockHistoryCollectRetryable {
Expand Down Expand Up @@ -1896,8 +1903,8 @@ func FillVirtualColumnValue(virtualRetTypes []*types.FieldType, virtualColumnInd
return nil
}

func setResourceGroupTagForTxn(sc *stmtctx.StatementContext, snapshot kv.Snapshot) {
func setResourceGroupTaggerForTxn(sc *stmtctx.StatementContext, snapshot kv.Snapshot) {
if snapshot != nil && variable.TopSQLEnabled() {
snapshot.SetOption(kv.ResourceGroupTag, sc.GetResourceGroupTag())
snapshot.SetOption(kv.ResourceGroupTagger, sc.GetResourceGroupTagger())
}
}
89 changes: 77 additions & 12 deletions executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8692,6 +8692,7 @@ func (s *testResourceTagSuite) TestResourceGroupTag(c *C) {
defer failpoint.Disable("github.com/pingcap/tidb/store/mockstore/unistore/unistoreRPCClientSendHook")

var sqlDigest, planDigest *parser.Digest
var tagLabel tipb.ResourceGroupTagLabel
checkFn := func() {}
unistore.UnistoreRPCClientSendHook = func(req *tikvrpc.Request) {
var startKey []byte
Expand Down Expand Up @@ -8734,6 +8735,7 @@ func (s *testResourceTagSuite) TestResourceGroupTag(c *C) {
c.Assert(err, IsNil)
sqlDigest = parser.NewDigest(tag.SqlDigest)
planDigest = parser.NewDigest(tag.PlanDigest)
tagLabel = *tag.Label
checkFn()
}

Expand All @@ -8743,19 +8745,78 @@ func (s *testResourceTagSuite) TestResourceGroupTag(c *C) {
}

cases := []struct {
sql string
ignore bool
sql string
tagLabels []tipb.ResourceGroupTagLabel
ignore bool
}{
{sql: "insert into t values(1,1),(2,2),(3,3)"},
{sql: "select * from t use index (idx) where a=1"},
{sql: "select * from t use index (idx) where a in (1,2,3)"},
{sql: "select * from t use index (idx) where a>1"},
{sql: "select * from t where b>1"},
{sql: "begin pessimistic", ignore: true},
{sql: "insert into t values(4,4)"},
{sql: "commit", ignore: true},
{sql: "update t set a=5,b=5 where a=5"},
{sql: "replace into t values(6,6)"},
{
sql: "insert into t values(1,1),(2,2),(3,3)",
tagLabels: []tipb.ResourceGroupTagLabel{
tipb.ResourceGroupTagLabel_ResourceGroupTagLabelIndex,
tipb.ResourceGroupTagLabel_ResourceGroupTagLabelIndex,
},
},
{
sql: "select * from t use index (idx) where a=1",
tagLabels: []tipb.ResourceGroupTagLabel{
tipb.ResourceGroupTagLabel_ResourceGroupTagLabelIndex,
tipb.ResourceGroupTagLabel_ResourceGroupTagLabelRow,
},
},
{
sql: "select * from t use index (idx) where a in (1,2,3)",
tagLabels: []tipb.ResourceGroupTagLabel{
tipb.ResourceGroupTagLabel_ResourceGroupTagLabelIndex,
tipb.ResourceGroupTagLabel_ResourceGroupTagLabelRow,
},
},
{
sql: "select * from t use index (idx) where a>1",
tagLabels: []tipb.ResourceGroupTagLabel{
tipb.ResourceGroupTagLabel_ResourceGroupTagLabelIndex,
tipb.ResourceGroupTagLabel_ResourceGroupTagLabelRow,
},
mornyx marked this conversation as resolved.
Show resolved Hide resolved
},
{
sql: "select * from t where b>1",
tagLabels: []tipb.ResourceGroupTagLabel{
tipb.ResourceGroupTagLabel_ResourceGroupTagLabelRow,
},
},
{
sql: "select a from t use index (idx) where a>1",
tagLabels: []tipb.ResourceGroupTagLabel{
tipb.ResourceGroupTagLabel_ResourceGroupTagLabelIndex,
},
},
{
sql: "begin pessimistic",
ignore: true,
},
{
sql: "insert into t values(4,4)",
tagLabels: []tipb.ResourceGroupTagLabel{
tipb.ResourceGroupTagLabel_ResourceGroupTagLabelIndex,
tipb.ResourceGroupTagLabel_ResourceGroupTagLabelRow,
},
},
{
sql: "commit",
ignore: true,
},
{
sql: "update t set a=5,b=5 where a=5",
tagLabels: []tipb.ResourceGroupTagLabel{
tipb.ResourceGroupTagLabel_ResourceGroupTagLabelIndex,
},
},
{
sql: "replace into t values(6,6)",
tagLabels: []tipb.ResourceGroupTagLabel{
tipb.ResourceGroupTagLabel_ResourceGroupTagLabelIndex,
tipb.ResourceGroupTagLabel_ResourceGroupTagLabelIndex,
},
},
}
for _, ca := range cases {
resetVars()
Expand All @@ -8777,6 +8838,10 @@ func (s *testResourceTagSuite) TestResourceGroupTag(c *C) {
}
c.Assert(sqlDigest.String(), Equals, expectSQLDigest.String(), commentf)
c.Assert(planDigest.String(), Equals, expectPlanDigest.String())
if len(ca.tagLabels) > 0 {
c.Assert(tagLabel, Equals, ca.tagLabels[0])
ca.tagLabels = ca.tagLabels[1:] // next label
}
checkCnt++
}

Expand Down
2 changes: 1 addition & 1 deletion executor/insert.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func (e *InsertExec) exec(ctx context.Context, rows [][]types.Datum) error {
if err != nil {
return err
}
setResourceGroupTagForTxn(sessVars.StmtCtx, txn)
setResourceGroupTaggerForTxn(sessVars.StmtCtx, txn)
txnSize := txn.Size()
sessVars.StmtCtx.AddRecordRows(uint64(len(rows)))
// If you use the IGNORE keyword, duplicate-key error that occurs while executing the INSERT statement are ignored.
Expand Down
2 changes: 1 addition & 1 deletion executor/point_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ func (e *PointGetExecutor) Open(context.Context) error {
panic("point get replica option fail")
}
})
setResourceGroupTagForTxn(e.ctx.GetSessionVars().StmtCtx, e.snapshot)
setResourceGroupTaggerForTxn(e.ctx.GetSessionVars().StmtCtx, e.snapshot)
return nil
}

Expand Down
2 changes: 1 addition & 1 deletion executor/replace.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ func (e *ReplaceExec) exec(ctx context.Context, newRows [][]types.Datum) error {
defer snapshot.SetOption(kv.CollectRuntimeStats, nil)
}
}
setResourceGroupTagForTxn(e.ctx.GetSessionVars().StmtCtx, txn)
setResourceGroupTaggerForTxn(e.ctx.GetSessionVars().StmtCtx, txn)
prefetchStart := time.Now()
// Use BatchGet to fill cache.
// It's an optimization and could be removed without affecting correctness.
Expand Down
2 changes: 1 addition & 1 deletion executor/update.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@ func (e *UpdateExec) updateRows(ctx context.Context) (int, error) {
if variable.TopSQLEnabled() {
txn, err := e.ctx.Txn(true)
if err == nil {
txn.SetOption(kv.ResourceGroupTag, e.ctx.GetSessionVars().StmtCtx.GetResourceGroupTag())
txn.SetOption(kv.ResourceGroupTagger, e.ctx.GetSessionVars().StmtCtx.GetResourceGroupTagger())
}
}
for rowIdx := 0; rowIdx < chk.NumRows(); rowIdx++ {
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ require (
github.com/spf13/pflag v1.0.5
github.com/stretchr/testify v1.7.0
github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2
github.com/tikv/client-go/v2 v2.0.0-alpha.0.20211115071040-a3f1c41ac1a0
github.com/tikv/client-go/v2 v2.0.0-alpha.0.20211118154139-b11da6307c6f
github.com/tikv/pd v1.1.0-beta.0.20211104095303-69c86d05d379
github.com/twmb/murmur3 v1.1.3
github.com/uber/jaeger-client-go v2.22.1+incompatible
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -710,8 +710,8 @@ github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2/go.mod h1:2PfK
github.com/tidwall/gjson v1.3.5/go.mod h1:P256ACg0Mn+j1RXIDXoss50DeIABTYK1PULOJHhxOls=
github.com/tidwall/match v1.0.1/go.mod h1:LujAq0jyVjBy028G1WhWfIzbpQfMO8bBZ6Tyb0+pL9E=
github.com/tidwall/pretty v1.0.0/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk=
github.com/tikv/client-go/v2 v2.0.0-alpha.0.20211115071040-a3f1c41ac1a0 h1:c12Pv8Xks4oubDr/uHHxrlBkwGJFqKZUEIUemHV794g=
github.com/tikv/client-go/v2 v2.0.0-alpha.0.20211115071040-a3f1c41ac1a0/go.mod h1:iiwtsCxcbNLK5i9VRYGvdcihgHXTKy2ukWjoaJsrphg=
github.com/tikv/client-go/v2 v2.0.0-alpha.0.20211118154139-b11da6307c6f h1:UyJjp3wGIjf1edGiQiIdAtL5QFqaqR4+s3LDwUZU7NY=
github.com/tikv/client-go/v2 v2.0.0-alpha.0.20211118154139-b11da6307c6f/go.mod h1:BEAS0vXm5BorlF/HTndqGwcGDvaiwe7B7BkfgwwZMJ4=
github.com/tikv/pd v1.1.0-beta.0.20211029083450-e65f0c55b6ae/go.mod h1:varH0IE0jJ9E9WN2Ei/N6pajMlPkcXdDEf7f5mmsUVQ=
github.com/tikv/pd v1.1.0-beta.0.20211104095303-69c86d05d379 h1:nFm1jQDz1iRktoyV2SyM5zVk6+PJHQNunJZ7ZJcqzAo=
github.com/tikv/pd v1.1.0-beta.0.20211104095303-69c86d05d379/go.mod h1:y+09hAUXJbrd4c0nktL74zXDDuD7atGtfOKxL90PCOE=
Expand Down
7 changes: 4 additions & 3 deletions infoschema/cluster_tables_serial_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import (
"github.com/pingcap/tidb/util/resourcegrouptag"
"github.com/pingcap/tidb/util/set"
"github.com/pingcap/tidb/util/testutil"
"github.com/pingcap/tipb/go-tipb"
"github.com/stretchr/testify/require"
"google.golang.org/grpc"
)
Expand Down Expand Up @@ -160,10 +161,10 @@ func SubTestTestDataLockWaits(s *clusterTablesSuite) func(*testing.T) {
_, digest1 := parser.NormalizeDigest("select * from test_data_lock_waits for update")
_, digest2 := parser.NormalizeDigest("update test_data_lock_waits set f1=1 where id=2")
s.store.(mockstorage.MockLockWaitSetter).SetMockLockWaits([]*deadlock.WaitForEntry{
{Txn: 1, WaitForTxn: 2, Key: []byte("key1"), ResourceGroupTag: resourcegrouptag.EncodeResourceGroupTag(digest1, nil)},
{Txn: 3, WaitForTxn: 4, Key: []byte("key2"), ResourceGroupTag: resourcegrouptag.EncodeResourceGroupTag(digest2, nil)},
{Txn: 1, WaitForTxn: 2, Key: []byte("key1"), ResourceGroupTag: resourcegrouptag.EncodeResourceGroupTag(digest1, nil, tipb.ResourceGroupTagLabel_ResourceGroupTagLabelUnknown)},
{Txn: 3, WaitForTxn: 4, Key: []byte("key2"), ResourceGroupTag: resourcegrouptag.EncodeResourceGroupTag(digest2, nil, tipb.ResourceGroupTagLabel_ResourceGroupTagLabelUnknown)},
// Invalid digests
{Txn: 5, WaitForTxn: 6, Key: []byte("key3"), ResourceGroupTag: resourcegrouptag.EncodeResourceGroupTag(nil, nil)},
{Txn: 5, WaitForTxn: 6, Key: []byte("key3"), ResourceGroupTag: resourcegrouptag.EncodeResourceGroupTag(nil, nil, tipb.ResourceGroupTagLabel_ResourceGroupTagLabelUnknown)},
{Txn: 7, WaitForTxn: 8, Key: []byte("key4"), ResourceGroupTag: []byte("asdfghjkl")},
})

Expand Down
5 changes: 3 additions & 2 deletions kv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
tikvstore "github.com/tikv/client-go/v2/kv"
"github.com/tikv/client-go/v2/oracle"
"github.com/tikv/client-go/v2/tikv"
"github.com/tikv/client-go/v2/tikvrpc"
)

// UnCommitIndexKVFlag uses to indicate the index key/value is no need to commit.
Expand Down Expand Up @@ -335,8 +336,8 @@ type Request struct {
IsStaleness bool
// MatchStoreLabels indicates the labels the store should be matched
MatchStoreLabels []*metapb.StoreLabel
// ResourceGroupTag indicates the kv request task group.
ResourceGroupTag []byte
// ResourceGroupTagger indicates the kv request task group tagger.
ResourceGroupTagger tikvrpc.ResourceGroupTagger
}

const (
Expand Down
5 changes: 4 additions & 1 deletion kv/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,13 @@ const (
IsStalenessReadOnly
// MatchStoreLabels indicates the labels the store should be matched
MatchStoreLabels
// ResourceGroupTag indicates the resource group of the kv request.
// ResourceGroupTag indicates the resource group tag of the kv request.
ResourceGroupTag
// ResourceGroupTagger can be used to set the ResourceGroupTag dynamically according to the request content. It will be used only when ResourceGroupTag is nil.
ResourceGroupTagger
// KVFilter indicates the filter to ignore key-values in the transaction's memory buffer.
KVFilter

// SnapInterceptor is used for setting the interceptor for snapshot
SnapInterceptor
)
Expand Down
2 changes: 1 addition & 1 deletion session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -544,7 +544,7 @@ func (s *session) doCommit(ctx context.Context) error {
}
s.txn.SetOption(kv.EnableAsyncCommit, sessVars.EnableAsyncCommit)
s.txn.SetOption(kv.Enable1PC, sessVars.Enable1PC)
s.txn.SetOption(kv.ResourceGroupTag, sessVars.StmtCtx.GetResourceGroupTag())
s.txn.SetOption(kv.ResourceGroupTagger, sessVars.StmtCtx.GetResourceGroupTagger())
// priority of the sysvar is lower than `start transaction with causal consistency only`
if val := s.txn.GetOption(kv.GuaranteeLinearizability); val == nil || val.(bool) {
// We needn't ask the TiKV client to guarantee linearizability for auto-commit transactions
Expand Down
Loading