Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

txnkv: add callback for setting ResourceGroupTag #368

Merged
merged 11 commits into from
Nov 15, 2021
4 changes: 2 additions & 2 deletions txnkv/transaction/2pc.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ type twoPhaseCommitter struct {

binlog BinlogExecutor

resourceGroupTag []byte
resourceGroupTagFactory util.ResourceGroupTagFactory

// allowed when tikv disk full happened.
diskFullOpt kvrpcpb.DiskFullOpt
Expand Down Expand Up @@ -494,7 +494,7 @@ func (c *twoPhaseCommitter) initKeysAndMutations() error {
c.lockTTL = txnLockTTL(txn.startTime, size)
c.priority = txn.priority.ToPB()
c.syncLog = txn.syncLog
c.resourceGroupTag = txn.resourceGroupTag
c.resourceGroupTagFactory = txn.resourceGroupTagFactory
c.setDetail(commitDetail)
return nil
}
Expand Down
9 changes: 7 additions & 2 deletions txnkv/transaction/cleanup.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import (
"github.com/tikv/client-go/v2/internal/retry"
"github.com/tikv/client-go/v2/metrics"
"github.com/tikv/client-go/v2/tikvrpc"
"github.com/tikv/client-go/v2/util"
"go.uber.org/zap"
)

Expand All @@ -59,11 +60,15 @@ func (actionCleanup) tiKVTxnRegionsNumHistogram() prometheus.Observer {
}

func (actionCleanup) handleSingleBatch(c *twoPhaseCommitter, bo *retry.Backoffer, batch batchMutations) error {
keys := batch.mutations.GetKeys()
req := tikvrpc.NewRequest(tikvrpc.CmdBatchRollback, &kvrpcpb.BatchRollbackRequest{
Keys: batch.mutations.GetKeys(),
Keys: keys,
StartVersion: c.startTS,
}, kvrpcpb.Context{Priority: c.priority, SyncLog: c.syncLog, ResourceGroupTag: c.resourceGroupTag,
}, kvrpcpb.Context{Priority: c.priority, SyncLog: c.syncLog,
MaxExecutionDurationMs: uint64(client.MaxWriteExecutionTime.Milliseconds())})
if c.resourceGroupTagFactory != nil && len(keys) > 0 {
req.ResourceGroupTag = c.resourceGroupTagFactory(util.ResourceGroupTagParams{FirstKey: keys[0]})
}
resp, err := c.store.SendReq(bo, req, batch.region, client.ReadTimeoutShort)
if err != nil {
return err
Expand Down
7 changes: 5 additions & 2 deletions txnkv/transaction/commit.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ import (
"github.com/tikv/client-go/v2/internal/retry"
"github.com/tikv/client-go/v2/metrics"
"github.com/tikv/client-go/v2/tikvrpc"
"github.com/tikv/client-go/v2/util"
"go.uber.org/zap"
)

Expand All @@ -70,9 +71,11 @@ func (actionCommit) handleSingleBatch(c *twoPhaseCommitter, bo *retry.Backoffer,
StartVersion: c.startTS,
Keys: keys,
CommitVersion: c.commitTS,
}, kvrpcpb.Context{Priority: c.priority, SyncLog: c.syncLog,
ResourceGroupTag: c.resourceGroupTag, DiskFullOpt: c.diskFullOpt,
}, kvrpcpb.Context{Priority: c.priority, SyncLog: c.syncLog, DiskFullOpt: c.diskFullOpt,
MaxExecutionDurationMs: uint64(client.MaxWriteExecutionTime.Milliseconds())})
if c.resourceGroupTagFactory != nil && len(keys) > 0 {
req.ResourceGroupTag = c.resourceGroupTagFactory(util.ResourceGroupTagParams{FirstKey: keys[0]})
}

tBegin := time.Now()
attempts := 0
Expand Down
8 changes: 7 additions & 1 deletion txnkv/transaction/prewrite.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,8 +141,14 @@ func (c *twoPhaseCommitter) buildPrewriteRequest(batch batchMutations, txnSize u
req.TryOnePc = true
}

var resourceGroupTag []byte
if c.resourceGroupTagFactory != nil && len(mutations) > 0 {
if mutation := mutations[0]; mutation != nil {
resourceGroupTag = c.resourceGroupTagFactory(util.ResourceGroupTagParams{FirstKey: mutation.Key})
}
}
return tikvrpc.NewRequest(tikvrpc.CmdPrewrite, req,
kvrpcpb.Context{Priority: c.priority, SyncLog: c.syncLog, ResourceGroupTag: c.resourceGroupTag,
kvrpcpb.Context{Priority: c.priority, SyncLog: c.syncLog, ResourceGroupTag: resourceGroupTag,
DiskFullOpt: c.diskFullOpt, MaxExecutionDurationMs: uint64(client.MaxWriteExecutionTime.Milliseconds())})
}

Expand Down
32 changes: 16 additions & 16 deletions txnkv/transaction/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,18 +98,18 @@ type KVTxn struct {
// commitCallback is called after current transaction gets committed
commitCallback func(info string, err error)

binlog BinlogExecutor
schemaLeaseChecker SchemaLeaseChecker
syncLog bool
priority txnutil.Priority
isPessimistic bool
enableAsyncCommit bool
enable1PC bool
causalConsistency bool
scope string
kvFilter KVFilter
resourceGroupTag []byte
diskFullOpt kvrpcpb.DiskFullOpt
binlog BinlogExecutor
schemaLeaseChecker SchemaLeaseChecker
syncLog bool
priority txnutil.Priority
isPessimistic bool
enableAsyncCommit bool
enable1PC bool
causalConsistency bool
scope string
kvFilter KVFilter
resourceGroupTagFactory util.ResourceGroupTagFactory
diskFullOpt kvrpcpb.DiskFullOpt
}

// NewTiKVTxn creates a new KVTxn.
Expand Down Expand Up @@ -226,10 +226,10 @@ func (txn *KVTxn) SetPriority(pri txnutil.Priority) {
txn.GetSnapshot().SetPriority(pri)
}

// SetResourceGroupTag sets the resource tag for both write and read.
func (txn *KVTxn) SetResourceGroupTag(tag []byte) {
txn.resourceGroupTag = tag
txn.GetSnapshot().SetResourceGroupTag(tag)
// SetResourceGroupTagFactory sets the resource tag factory for both write and read.
func (txn *KVTxn) SetResourceGroupTagFactory(f util.ResourceGroupTagFactory) {
txn.resourceGroupTagFactory = f
txn.GetSnapshot().SetResourceGroupTagFactory(f)
}

// SetSchemaAmender sets an amender to update mutations after schema change.
Expand Down
21 changes: 13 additions & 8 deletions txnkv/txnsnapshot/scan.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ import (
"github.com/tikv/client-go/v2/kv"
"github.com/tikv/client-go/v2/tikvrpc"
"github.com/tikv/client-go/v2/txnkv/txnlock"
"github.com/tikv/client-go/v2/util"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -216,10 +217,9 @@ func (s *Scanner) getData(bo *retry.Backoffer) error {
}
sreq := &kvrpcpb.ScanRequest{
Context: &kvrpcpb.Context{
Priority: s.snapshot.priority.ToPB(),
NotFillCache: s.snapshot.notFillCache,
IsolationLevel: s.snapshot.isolationLevel.ToPB(),
ResourceGroupTag: s.snapshot.resourceGroupTag,
Priority: s.snapshot.priority.ToPB(),
NotFillCache: s.snapshot.notFillCache,
IsolationLevel: s.snapshot.isolationLevel.ToPB(),
},
StartKey: s.nextStartKey,
EndKey: reqEndKey,
Expand All @@ -233,13 +233,18 @@ func (s *Scanner) getData(bo *retry.Backoffer) error {
sreq.EndKey = reqStartKey
sreq.Reverse = true
}
if s.snapshot.resourceGroupTagFactory != nil {
sreq.Context.ResourceGroupTag = s.snapshot.resourceGroupTagFactory(util.ResourceGroupTagParams{FirstKey: sreq.StartKey})
}
s.snapshot.mu.RLock()
req := tikvrpc.NewReplicaReadRequest(tikvrpc.CmdScan, sreq, s.snapshot.mu.replicaRead, &s.snapshot.replicaReadSeed, kvrpcpb.Context{
Priority: s.snapshot.priority.ToPB(),
NotFillCache: s.snapshot.notFillCache,
TaskId: s.snapshot.mu.taskID,
ResourceGroupTag: s.snapshot.resourceGroupTag,
Priority: s.snapshot.priority.ToPB(),
NotFillCache: s.snapshot.notFillCache,
TaskId: s.snapshot.mu.taskID,
})
if s.snapshot.resourceGroupTagFactory != nil {
req.ResourceGroupTag = s.snapshot.resourceGroupTagFactory(util.ResourceGroupTagParams{FirstKey: sreq.StartKey})
}
s.snapshot.mu.RUnlock()
resp, err := sender.SendReq(bo, req, loc.Region, client.ReadTimeoutMedium)
if err != nil {
Expand Down
30 changes: 17 additions & 13 deletions txnkv/txnsnapshot/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,8 +132,8 @@ type KVSnapshot struct {
matchStoreLabels []*metapb.StoreLabel
}
sampleStep uint32
// resourceGroupTag is use to set the kv request resource group tag.
resourceGroupTag []byte
// resourceGroupTagFactory is use to set the kv request resource group tag.
resourceGroupTagFactory util.ResourceGroupTagFactory
}

// NewTiKVSnapshot creates a snapshot of an TiKV store.
Expand Down Expand Up @@ -348,11 +348,13 @@ func (s *KVSnapshot) batchGetSingleRegion(bo *retry.Backoffer, batch batchKeys,
Keys: pending,
Version: s.version,
}, s.mu.replicaRead, &s.replicaReadSeed, kvrpcpb.Context{
Priority: s.priority.ToPB(),
NotFillCache: s.notFillCache,
TaskId: s.mu.taskID,
ResourceGroupTag: s.resourceGroupTag,
Priority: s.priority.ToPB(),
NotFillCache: s.notFillCache,
TaskId: s.mu.taskID,
})
if s.resourceGroupTagFactory != nil && len(pending) > 0 {
req.ResourceGroupTag = s.resourceGroupTagFactory(util.ResourceGroupTagParams{FirstKey: pending[0]})
}
scope := s.mu.readReplicaScope
isStaleness := s.mu.isStaleness
matchStoreLabels := s.mu.matchStoreLabels
Expand Down Expand Up @@ -515,11 +517,13 @@ func (s *KVSnapshot) get(ctx context.Context, bo *retry.Backoffer, k []byte) ([]
Key: k,
Version: s.version,
}, s.mu.replicaRead, &s.replicaReadSeed, kvrpcpb.Context{
Priority: s.priority.ToPB(),
NotFillCache: s.notFillCache,
TaskId: s.mu.taskID,
ResourceGroupTag: s.resourceGroupTag,
Priority: s.priority.ToPB(),
NotFillCache: s.notFillCache,
TaskId: s.mu.taskID,
})
if s.resourceGroupTagFactory != nil {
req.ResourceGroupTag = s.resourceGroupTagFactory(util.ResourceGroupTagParams{FirstKey: k})
}
isStaleness := s.mu.isStaleness
matchStoreLabels := s.mu.matchStoreLabels
scope := s.mu.readReplicaScope
Expand Down Expand Up @@ -714,9 +718,9 @@ func (s *KVSnapshot) SetMatchStoreLabels(labels []*metapb.StoreLabel) {
s.mu.matchStoreLabels = labels
}

// SetResourceGroupTag sets resource group of the kv request.
func (s *KVSnapshot) SetResourceGroupTag(tag []byte) {
s.resourceGroupTag = tag
// SetResourceGroupTagFactory sets resource group tag factory of the kv request.
func (s *KVSnapshot) SetResourceGroupTagFactory(f util.ResourceGroupTagFactory) {
s.resourceGroupTagFactory = f
}

// SnapCacheHitCount gets the snapshot cache hit count. Only for test.
Expand Down
10 changes: 10 additions & 0 deletions util/resource_tag.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package util

// ResourceGroupTagParams are some necessary parameters used to generate ResourceGroupTag.
type ResourceGroupTagParams struct {
// FirstKey is the first key of current TiKV request.
FirstKey []byte
mornyx marked this conversation as resolved.
Show resolved Hide resolved
}

// ResourceGroupTagFactory is used to generate the ResourceGroupTag required for each request.
type ResourceGroupTagFactory func(params ResourceGroupTagParams) []byte