Skip to content

Commit

Permalink
topsql: introduce stmtstats and sql execution count (#30277)
Browse files Browse the repository at this point in the history
  • Loading branch information
mornyx authored Dec 21, 2021
1 parent 77b4e40 commit fe1aaf2
Show file tree
Hide file tree
Showing 29 changed files with 1,092 additions and 13 deletions.
24 changes: 21 additions & 3 deletions distsql/distsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,20 @@ import (
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/metrics"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/stmtctx"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/statistics"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/memory"
"github.com/pingcap/tidb/util/trxevents"
"github.com/pingcap/tipb/go-tipb"
"github.com/tikv/client-go/v2/tikvrpc/interceptor"
"go.uber.org/zap"
)

// DispatchMPPTasks dispatches all tasks and returns an iterator.
func DispatchMPPTasks(ctx context.Context, sctx sessionctx.Context, tasks []*kv.MPPDispatchRequest, fieldTypes []*types.FieldType, planIDs []int, rootID int) (SelectResult, error) {
ctx = WithSQLKvExecCounterInterceptor(ctx, sctx.GetSessionVars().StmtCtx)
_, allowTiFlashFallback := sctx.GetSessionVars().AllowFallbackToTiKV[kv.TiFlash]
resp := sctx.GetMPPClient().DispatchMPPTasks(ctx, sctx.GetSessionVars().KVVars, tasks, allowTiFlashFallback)
if resp == nil {
Expand Down Expand Up @@ -88,6 +91,8 @@ func Select(ctx context.Context, sctx sessionctx.Context, kvReq *kv.Request, fie
zap.String("stmt", originalSQL))
}
}

ctx = WithSQLKvExecCounterInterceptor(ctx, sctx.GetSessionVars().StmtCtx)
resp := sctx.GetClient().Send(ctx, kvReq, sctx.GetSessionVars().KVVars, sctx.GetSessionVars().StmtCtx.MemTracker, enabledRateLimitAction, eventCb)
if resp == nil {
return nil, errors.New("client returns nil response")
Expand Down Expand Up @@ -149,8 +154,9 @@ func SelectWithRuntimeStats(ctx context.Context, sctx sessionctx.Context, kvReq

// Analyze do a analyze request.
func Analyze(ctx context.Context, client kv.Client, kvReq *kv.Request, vars interface{},
isRestrict bool, sessionMemTracker *memory.Tracker) (SelectResult, error) {
resp := client.Send(ctx, kvReq, vars, sessionMemTracker, false, nil)
isRestrict bool, stmtCtx *stmtctx.StatementContext) (SelectResult, error) {
ctx = WithSQLKvExecCounterInterceptor(ctx, stmtCtx)
resp := client.Send(ctx, kvReq, vars, stmtCtx.MemTracker, false, nil)
if resp == nil {
return nil, errors.New("client returns nil response")
}
Expand Down Expand Up @@ -244,3 +250,15 @@ func init() {
systemEndian = tipb.Endian_LittleEndian
}
}

// WithSQLKvExecCounterInterceptor binds an interceptor for client-go to count the
// number of SQL executions of each TiKV (if any).
func WithSQLKvExecCounterInterceptor(ctx context.Context, stmtCtx *stmtctx.StatementContext) context.Context {
if variable.TopSQLEnabled() && stmtCtx.KvExecCounter != nil {
// Unlike calling Transaction or Snapshot interface, in distsql package we directly
// face tikv Request. So we need to manually bind RPCInterceptor to ctx. Instead of
// calling SetRPCInterceptor on Transaction or Snapshot.
return interceptor.WithRPCInterceptor(ctx, stmtCtx.KvExecCounter.RPCInterceptor())
}
return ctx
}
2 changes: 1 addition & 1 deletion distsql/distsql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ func TestAnalyze(t *testing.T) {
Build()
require.NoError(t, err)

response, err := Analyze(context.TODO(), sctx.GetClient(), request, tikvstore.DefaultVars, true, sctx.GetSessionVars().StmtCtx.MemTracker)
response, err := Analyze(context.TODO(), sctx.GetClient(), request, tikvstore.DefaultVars, true, sctx.GetSessionVars().StmtCtx)
require.NoError(t, err)

result, ok := response.(*selectResult)
Expand Down
31 changes: 31 additions & 0 deletions executor/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,7 @@ func (a *ExecStmt) PointGet(ctx context.Context, is infoschema.InfoSchema) (*rec
ctx = opentracing.ContextWithSpan(ctx, span1)
}
ctx = a.setPlanLabelForTopSQL(ctx)
a.observeStmtBeginForTopSQL()
startTs := uint64(math.MaxUint64)
err := a.Ctx.InitTxnWithStartTS(startTs)
if err != nil {
Expand Down Expand Up @@ -383,6 +384,7 @@ func (a *ExecStmt) Exec(ctx context.Context) (_ sqlexec.RecordSet, err error) {
}
// ExecuteExec will rewrite `a.Plan`, so set plan label should be executed after `a.buildExecutor`.
ctx = a.setPlanLabelForTopSQL(ctx)
a.observeStmtBeginForTopSQL()

if err = e.Open(ctx); err != nil {
terror.Call(e.Close)
Expand Down Expand Up @@ -896,6 +898,7 @@ func (a *ExecStmt) FinishExecuteStmt(txnTS uint64, err error, hasMoreResults boo
// `LowSlowQuery` and `SummaryStmt` must be called before recording `PrevStmt`.
a.LogSlowQuery(txnTS, succ, hasMoreResults)
a.SummaryStmt(succ)
a.observeStmtFinishedForTopSQL()
if sessVars.StmtCtx.IsTiFlash.Load() {
if succ {
totalTiFlashQuerySuccCounter.Inc()
Expand Down Expand Up @@ -1247,3 +1250,31 @@ func (a *ExecStmt) GetTextToLog() string {
}
return sql
}

func (a *ExecStmt) observeStmtBeginForTopSQL() {
if vars := a.Ctx.GetSessionVars(); variable.TopSQLEnabled() && vars.StmtStats != nil {
sqlDigest, planDigest := a.getSQLPlanDigest()
vars.StmtStats.OnExecutionBegin(sqlDigest, planDigest)
// This is a special logic prepared for TiKV's SQLExecCount.
vars.StmtCtx.KvExecCounter = vars.StmtStats.CreateKvExecCounter(sqlDigest, planDigest)
}
}

func (a *ExecStmt) observeStmtFinishedForTopSQL() {
if vars := a.Ctx.GetSessionVars(); variable.TopSQLEnabled() && vars.StmtStats != nil {
sqlDigest, planDigest := a.getSQLPlanDigest()
vars.StmtStats.OnExecutionFinished(sqlDigest, planDigest)
}
}

func (a *ExecStmt) getSQLPlanDigest() ([]byte, []byte) {
var sqlDigest, planDigest []byte
vars := a.Ctx.GetSessionVars()
if _, d := vars.StmtCtx.SQLDigest(); d != nil {
sqlDigest = d.Bytes()
}
if _, d := vars.StmtCtx.GetPlanDigest(); d != nil {
planDigest = d.Bytes()
}
return sqlDigest, planDigest
}
6 changes: 4 additions & 2 deletions executor/analyze.go
Original file line number Diff line number Diff line change
Expand Up @@ -400,7 +400,7 @@ func (e *AnalyzeIndexExec) fetchAnalyzeResult(ranges []*ranger.Range, isNullRang
return err
}
ctx := context.TODO()
result, err := distsql.Analyze(ctx, e.ctx.GetClient(), kvReq, e.ctx.GetSessionVars().KVVars, e.ctx.GetSessionVars().InRestrictedSQL, e.ctx.GetSessionVars().StmtCtx.MemTracker)
result, err := distsql.Analyze(ctx, e.ctx.GetClient(), kvReq, e.ctx.GetSessionVars().KVVars, e.ctx.GetSessionVars().InRestrictedSQL, e.ctx.GetSessionVars().StmtCtx)
if err != nil {
return err
}
Expand Down Expand Up @@ -763,7 +763,7 @@ func (e *AnalyzeColumnsExec) buildResp(ranges []*ranger.Range) (distsql.SelectRe
return nil, err
}
ctx := context.TODO()
result, err := distsql.Analyze(ctx, e.ctx.GetClient(), kvReq, e.ctx.GetSessionVars().KVVars, e.ctx.GetSessionVars().InRestrictedSQL, e.ctx.GetSessionVars().StmtCtx.MemTracker)
result, err := distsql.Analyze(ctx, e.ctx.GetClient(), kvReq, e.ctx.GetSessionVars().KVVars, e.ctx.GetSessionVars().InRestrictedSQL, e.ctx.GetSessionVars().StmtCtx)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -1854,6 +1854,7 @@ func (e *AnalyzeFastExec) handleScanTasks(bo *tikv.Backoffer) (keysSize int, err
snapshot.SetOption(kv.ReplicaRead, kv.ReplicaReadFollower)
}
setResourceGroupTaggerForTxn(e.ctx.GetSessionVars().StmtCtx, snapshot)
setRPCInterceptorOfExecCounterForTxn(e.ctx.GetSessionVars(), snapshot)
for _, t := range e.scanTasks {
iter, err := snapshot.Iter(kv.Key(t.StartKey), kv.Key(t.EndKey))
if err != nil {
Expand All @@ -1875,6 +1876,7 @@ func (e *AnalyzeFastExec) handleSampTasks(workID int, step uint32, err *error) {
snapshot.SetOption(kv.IsolationLevel, kv.SI)
snapshot.SetOption(kv.Priority, kv.PriorityLow)
setResourceGroupTaggerForTxn(e.ctx.GetSessionVars().StmtCtx, snapshot)
setRPCInterceptorOfExecCounterForTxn(e.ctx.GetSessionVars(), snapshot)
readReplicaType := e.ctx.GetSessionVars().GetReplicaRead()
if readReplicaType.IsFollowerRead() {
snapshot.SetOption(kv.ReplicaRead, readReplicaType)
Expand Down
1 change: 1 addition & 0 deletions executor/batch_point_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ func (e *BatchPointGetExec) Open(context.Context) error {
})
}
setResourceGroupTaggerForTxn(stmtCtx, snapshot)
setRPCInterceptorOfExecCounterForTxn(sessVars, snapshot)
var batchGetter kv.BatchGetter = snapshot
if txn.Valid() {
lock := e.tblInfo.Lock
Expand Down
2 changes: 1 addition & 1 deletion executor/checksum.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ func (e *ChecksumTableExec) checksumWorker(taskCh <-chan *checksumTask, resultCh
}

func (e *ChecksumTableExec) handleChecksumRequest(req *kv.Request) (resp *tipb.ChecksumResponse, err error) {
ctx := context.TODO()
ctx := distsql.WithSQLKvExecCounterInterceptor(context.TODO(), e.ctx.GetSessionVars().StmtCtx)
res, err := distsql.Checksum(ctx, e.ctx.GetClient(), req, e.ctx.GetSessionVars().KVVars)
if err != nil {
return nil, err
Expand Down
8 changes: 8 additions & 0 deletions executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1914,3 +1914,11 @@ func setResourceGroupTaggerForTxn(sc *stmtctx.StatementContext, snapshot kv.Snap
snapshot.SetOption(kv.ResourceGroupTagger, sc.GetResourceGroupTagger())
}
}

// setRPCInterceptorOfExecCounterForTxn binds an interceptor for client-go to count
// the number of SQL executions of each TiKV.
func setRPCInterceptorOfExecCounterForTxn(vars *variable.SessionVars, snapshot kv.Snapshot) {
if snapshot != nil && variable.TopSQLEnabled() && vars.StmtCtx.KvExecCounter != nil {
snapshot.SetOption(kv.RPCInterceptor, vars.StmtCtx.KvExecCounter.RPCInterceptor())
}
}
1 change: 1 addition & 0 deletions executor/insert.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ func (e *InsertExec) exec(ctx context.Context, rows [][]types.Datum) error {
return err
}
setResourceGroupTaggerForTxn(sessVars.StmtCtx, txn)
setRPCInterceptorOfExecCounterForTxn(sessVars, txn)
txnSize := txn.Size()
sessVars.StmtCtx.AddRecordRows(uint64(len(rows)))
// If you use the IGNORE keyword, duplicate-key error that occurs while executing the INSERT statement are ignored.
Expand Down
1 change: 1 addition & 0 deletions executor/point_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,7 @@ func (e *PointGetExecutor) Open(context.Context) error {
}
})
setResourceGroupTaggerForTxn(e.ctx.GetSessionVars().StmtCtx, e.snapshot)
setRPCInterceptorOfExecCounterForTxn(e.ctx.GetSessionVars(), e.snapshot)
return nil
}

Expand Down
1 change: 1 addition & 0 deletions executor/replace.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,7 @@ func (e *ReplaceExec) exec(ctx context.Context, newRows [][]types.Datum) error {
}
}
setResourceGroupTaggerForTxn(e.ctx.GetSessionVars().StmtCtx, txn)
setRPCInterceptorOfExecCounterForTxn(e.ctx.GetSessionVars(), txn)
prefetchStart := time.Now()
// Use BatchGet to fill cache.
// It's an optimization and could be removed without affecting correctness.
Expand Down
4 changes: 4 additions & 0 deletions executor/update.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,10 @@ func (e *UpdateExec) updateRows(ctx context.Context) (int, error) {
txn, err := e.ctx.Txn(true)
if err == nil {
txn.SetOption(kv.ResourceGroupTagger, e.ctx.GetSessionVars().StmtCtx.GetResourceGroupTagger())
if e.ctx.GetSessionVars().StmtCtx.KvExecCounter != nil {
// Bind an interceptor for client-go to count the number of SQL executions of each TiKV.
txn.SetOption(kv.RPCInterceptor, e.ctx.GetSessionVars().StmtCtx.KvExecCounter.RPCInterceptor())
}
}
}
for rowIdx := 0; rowIdx < chk.NumRows(); rowIdx++ {
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ require (
github.com/spf13/pflag v1.0.5
github.com/stretchr/testify v1.7.0
github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2
github.com/tikv/client-go/v2 v2.0.0-rc.0.20211214093715-605f49d3ba50
github.com/tikv/client-go/v2 v2.0.0-rc.0.20211218050306-6165dbaa95d0
github.com/tikv/pd v1.1.0-beta.0.20211118054146-02848d2660ee
github.com/twmb/murmur3 v1.1.3
github.com/uber/jaeger-client-go v2.22.1+incompatible
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -712,8 +712,8 @@ github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2/go.mod h1:2PfK
github.com/tidwall/gjson v1.3.5/go.mod h1:P256ACg0Mn+j1RXIDXoss50DeIABTYK1PULOJHhxOls=
github.com/tidwall/match v1.0.1/go.mod h1:LujAq0jyVjBy028G1WhWfIzbpQfMO8bBZ6Tyb0+pL9E=
github.com/tidwall/pretty v1.0.0/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk=
github.com/tikv/client-go/v2 v2.0.0-rc.0.20211214093715-605f49d3ba50 h1:B+cAIm2P1/SNsVV1vL9/mRaGUVl/vdgV8MU03O0vY28=
github.com/tikv/client-go/v2 v2.0.0-rc.0.20211214093715-605f49d3ba50/go.mod h1:wRuh+W35daKTiYBld0oBlT6PSkzEVr+pB/vChzJZk+8=
github.com/tikv/client-go/v2 v2.0.0-rc.0.20211218050306-6165dbaa95d0 h1:38Jst/O36MKXAt7aD1Ipnx4nKwclG66ifkcmi4f0NZ4=
github.com/tikv/client-go/v2 v2.0.0-rc.0.20211218050306-6165dbaa95d0/go.mod h1:wRuh+W35daKTiYBld0oBlT6PSkzEVr+pB/vChzJZk+8=
github.com/tikv/pd v1.1.0-beta.0.20211029083450-e65f0c55b6ae/go.mod h1:varH0IE0jJ9E9WN2Ei/N6pajMlPkcXdDEf7f5mmsUVQ=
github.com/tikv/pd v1.1.0-beta.0.20211118054146-02848d2660ee h1:rAAdvQ8Hh36syHr92g0VmZEpkH+40RGQBpFL2121xMs=
github.com/tikv/pd v1.1.0-beta.0.20211118054146-02848d2660ee/go.mod h1:lRbwxBAhnTQR5vqbTzeI/Bj62bD2OvYYuFezo2vrmeI=
Expand Down
4 changes: 3 additions & 1 deletion kv/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,12 +68,14 @@ const (
ResourceGroupTagger
// KVFilter indicates the filter to ignore key-values in the transaction's memory buffer.
KVFilter

// SnapInterceptor is used for setting the interceptor for snapshot
SnapInterceptor
// CommitTSUpperBoundChec is used by cached table
// The commitTS must be greater than all the write lock lease of the visited cached table.
CommitTSUpperBoundCheck
// RPCInterceptor is interceptor.RPCInterceptor on Transaction or Snapshot, used to decorate
// additional logic before and after the underlying client-go RPC request.
RPCInterceptor
)

// ReplicaReadType is the type of replica to read data from
Expand Down
7 changes: 7 additions & 0 deletions session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -548,6 +548,10 @@ func (s *session) doCommit(ctx context.Context) error {
s.txn.SetOption(kv.EnableAsyncCommit, sessVars.EnableAsyncCommit)
s.txn.SetOption(kv.Enable1PC, sessVars.Enable1PC)
s.txn.SetOption(kv.ResourceGroupTagger, sessVars.StmtCtx.GetResourceGroupTagger())
if sessVars.StmtCtx.KvExecCounter != nil {
// Bind an interceptor for client-go to count the number of SQL executions of each TiKV.
s.txn.SetOption(kv.RPCInterceptor, sessVars.StmtCtx.KvExecCounter.RPCInterceptor())
}
// priority of the sysvar is lower than `start transaction with causal consistency only`
if val := s.txn.GetOption(kv.GuaranteeLinearizability); val == nil || val.(bool) {
// We needn't ask the TiKV client to guarantee linearizability for auto-commit transactions
Expand Down Expand Up @@ -2311,6 +2315,9 @@ func (s *session) Close() {
s.RollbackTxn(ctx)
if s.sessionVars != nil {
s.sessionVars.WithdrawAllPreparedStmt()
if s.sessionVars.StmtStats != nil {
s.sessionVars.StmtStats.SetFinished()
}
}
s.ClearDiskFullOpt()
}
Expand Down
7 changes: 7 additions & 0 deletions sessionctx/stmtctx/stmtctx.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/pingcap/tidb/util/execdetails"
"github.com/pingcap/tidb/util/memory"
"github.com/pingcap/tidb/util/resourcegrouptag"
"github.com/pingcap/tidb/util/topsql/stmtstats"
"github.com/pingcap/tidb/util/tracing"
"github.com/tikv/client-go/v2/tikvrpc"
"github.com/tikv/client-go/v2/util"
Expand Down Expand Up @@ -207,6 +208,12 @@ type StatementContext struct {

// WaitLockLeaseTime is the duration of cached table read lease expiration time.
WaitLockLeaseTime time.Duration

// KvExecCounter is created from SessionVars.StmtStats to count the number of SQL
// executions of the kv layer during the current execution of the statement.
// Its life cycle is limited to this execution, and a new KvExecCounter is
// always created during each statement execution.
KvExecCounter *stmtstats.KvExecCounter
}

// StmtHints are SessionVars related sql hints.
Expand Down
12 changes: 10 additions & 2 deletions sessionctx/variable/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,6 @@ import (
"sync/atomic"
"time"

utilMath "github.com/pingcap/tidb/util/math"

"github.com/pingcap/errors"
pumpcli "github.com/pingcap/tidb-tools/tidb-binlog/pump_client"
"github.com/pingcap/tidb/config"
Expand All @@ -48,10 +46,12 @@ import (
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/execdetails"
utilMath "github.com/pingcap/tidb/util/math"
"github.com/pingcap/tidb/util/rowcodec"
"github.com/pingcap/tidb/util/stringutil"
"github.com/pingcap/tidb/util/tableutil"
"github.com/pingcap/tidb/util/timeutil"
"github.com/pingcap/tidb/util/topsql/stmtstats"
tikvstore "github.com/tikv/client-go/v2/kv"
"github.com/tikv/client-go/v2/oracle"
"github.com/twmb/murmur3"
Expand Down Expand Up @@ -969,6 +969,13 @@ type SessionVars struct {

// EnablePaging indicates whether enable paging in coprocessor requests.
EnablePaging bool

// StmtStats is used to count various indicators of each SQL in this session
// at each point in time. These data will be periodically taken away by the
// background goroutine. The background goroutine will continue to aggregate
// all the local data in each session, and finally report them to the remote
// regularly.
StmtStats *stmtstats.StatementStats
}

// InitStatementContext initializes a StatementContext, the object is reused to reduce allocation.
Expand Down Expand Up @@ -1203,6 +1210,7 @@ func NewSessionVars() *SessionVars {
MPPStoreFailTTL: DefTiDBMPPStoreFailTTL,
EnablePlacementChecks: DefEnablePlacementCheck,
Rng: utilMath.NewWithTime(),
StmtStats: stmtstats.CreateStatementStats(),
}
vars.KVVars = tikvstore.NewVariables(&vars.Killed)
vars.Concurrency = Concurrency{
Expand Down
3 changes: 3 additions & 0 deletions store/driver/txn/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
derr "github.com/pingcap/tidb/store/driver/error"
"github.com/pingcap/tidb/store/driver/options"
"github.com/tikv/client-go/v2/tikvrpc"
"github.com/tikv/client-go/v2/tikvrpc/interceptor"
"github.com/tikv/client-go/v2/txnkv/txnsnapshot"
"github.com/tikv/client-go/v2/txnkv/txnutil"
)
Expand Down Expand Up @@ -120,6 +121,8 @@ func (s *tikvSnapshot) SetOption(opt int, val interface{}) {
s.KVSnapshot.SetReadReplicaScope(val.(string))
case kv.SnapInterceptor:
s.interceptor = val.(kv.SnapshotInterceptor)
case kv.RPCInterceptor:
s.KVSnapshot.SetRPCInterceptor(val.(interceptor.RPCInterceptor))
}
}

Expand Down
3 changes: 3 additions & 0 deletions store/driver/txn/txn_driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
tikvstore "github.com/tikv/client-go/v2/kv"
"github.com/tikv/client-go/v2/tikv"
"github.com/tikv/client-go/v2/tikvrpc"
"github.com/tikv/client-go/v2/tikvrpc/interceptor"
"github.com/tikv/client-go/v2/txnkv/txnsnapshot"
)

Expand Down Expand Up @@ -232,6 +233,8 @@ func (txn *tikvTxn) SetOption(opt int, val interface{}) {
txn.snapshotInterceptor = val.(kv.SnapshotInterceptor)
case kv.CommitTSUpperBoundCheck:
txn.KVTxn.SetCommitTSUpperBoundCheck(val.(func(commitTS uint64) bool))
case kv.RPCInterceptor:
txn.KVTxn.SetRPCInterceptor(val.(interceptor.RPCInterceptor))
}
}

Expand Down
Loading

0 comments on commit fe1aaf2

Please sign in to comment.