Skip to content

Commit

Permalink
store: forbid collecting info if enable-collect-execution-info disabl…
Browse files Browse the repository at this point in the history
…ed (#31282) (#31409)

close #31038
  • Loading branch information
ti-srebot committed Feb 23, 2022
1 parent d82ee69 commit 200b1e0
Show file tree
Hide file tree
Showing 13 changed files with 157 additions and 97 deletions.
4 changes: 1 addition & 3 deletions br/pkg/lightning/restore/checksum_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,7 @@ import (
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/parser"
"github.com/pingcap/tidb/parser/ast"
"github.com/pingcap/tidb/util/memory"
tmock "github.com/pingcap/tidb/util/mock"
"github.com/pingcap/tidb/util/trxevents"
"github.com/pingcap/tipb/go-tipb"
"github.com/tikv/client-go/v2/oracle"
pd "github.com/tikv/pd/client"
Expand Down Expand Up @@ -395,7 +393,7 @@ type mockChecksumKVClient struct {
}

// a mock client for checksum request
func (c *mockChecksumKVClient) Send(ctx context.Context, req *kv.Request, vars interface{}, sessionMemTracker *memory.Tracker, enabledRateLimitAction bool, eventCb trxevents.EventCallback) kv.Response {
func (c *mockChecksumKVClient) Send(ctx context.Context, req *kv.Request, vars interface{}, option *kv.ClientSendOption) kv.Response {
if c.curErrCount < c.maxErrCount {
c.curErrCount++
return &mockErrorResponse{err: "tikv timeout"}
Expand Down
17 changes: 12 additions & 5 deletions distsql/distsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,14 @@ import (

"github.com/opentracing/opentracing-go"
"github.com/pingcap/errors"
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/metrics"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/stmtctx"
"github.com/pingcap/tidb/statistics"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/memory"
"github.com/pingcap/tidb/util/trxevents"
"github.com/pingcap/tipb/go-tipb"
"go.uber.org/zap"
Expand Down Expand Up @@ -88,7 +89,13 @@ func Select(ctx context.Context, sctx sessionctx.Context, kvReq *kv.Request, fie
zap.String("stmt", originalSQL))
}
}
resp := sctx.GetClient().Send(ctx, kvReq, sctx.GetSessionVars().KVVars, sctx.GetSessionVars().StmtCtx.MemTracker, enabledRateLimitAction, eventCb)
option := &kv.ClientSendOption{
SessionMemTracker: sctx.GetSessionVars().StmtCtx.MemTracker,
EnabledRateLimitAction: enabledRateLimitAction,
EventCb: eventCb,
EnableCollectExecutionInfo: config.GetGlobalConfig().EnableCollectExecutionInfo,
}
resp := sctx.GetClient().Send(ctx, kvReq, sctx.GetSessionVars().KVVars, option)
if resp == nil {
err := errors.New("client returns nil response")
return nil, err
Expand Down Expand Up @@ -149,8 +156,8 @@ func SelectWithRuntimeStats(ctx context.Context, sctx sessionctx.Context, kvReq

// Analyze do a analyze request.
func Analyze(ctx context.Context, client kv.Client, kvReq *kv.Request, vars interface{},
isRestrict bool, sessionMemTracker *memory.Tracker) (SelectResult, error) {
resp := client.Send(ctx, kvReq, vars, sessionMemTracker, false, nil)
isRestrict bool, stmtCtx *stmtctx.StatementContext) (SelectResult, error) {
resp := client.Send(ctx, kvReq, vars, &kv.ClientSendOption{})
if resp == nil {
return nil, errors.New("client returns nil response")
}
Expand All @@ -173,7 +180,7 @@ func Analyze(ctx context.Context, client kv.Client, kvReq *kv.Request, vars inte
func Checksum(ctx context.Context, client kv.Client, kvReq *kv.Request, vars interface{}) (SelectResult, error) {
// FIXME: As BR have dependency of `Checksum` and TiDB also introduced BR as dependency, Currently we can't edit
// Checksum function signature. The two-way dependence should be removed in future.
resp := client.Send(ctx, kvReq, vars, nil, false, nil)
resp := client.Send(ctx, kvReq, vars, &kv.ClientSendOption{})
if resp == nil {
return nil, errors.New("client returns nil response")
}
Expand Down
2 changes: 1 addition & 1 deletion distsql/distsql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ func TestAnalyze(t *testing.T) {
Build()
require.NoError(t, err)

response, err := Analyze(context.TODO(), sctx.GetClient(), request, tikvstore.DefaultVars, true, sctx.GetSessionVars().StmtCtx.MemTracker)
response, err := Analyze(context.TODO(), sctx.GetClient(), request, tikvstore.DefaultVars, true, sctx.GetSessionVars().StmtCtx)
require.NoError(t, err)

result, ok := response.(*selectResult)
Expand Down
4 changes: 2 additions & 2 deletions executor/analyze.go
Original file line number Diff line number Diff line change
Expand Up @@ -400,7 +400,7 @@ func (e *AnalyzeIndexExec) fetchAnalyzeResult(ranges []*ranger.Range, isNullRang
return err
}
ctx := context.TODO()
result, err := distsql.Analyze(ctx, e.ctx.GetClient(), kvReq, e.ctx.GetSessionVars().KVVars, e.ctx.GetSessionVars().InRestrictedSQL, e.ctx.GetSessionVars().StmtCtx.MemTracker)
result, err := distsql.Analyze(ctx, e.ctx.GetClient(), kvReq, e.ctx.GetSessionVars().KVVars, e.ctx.GetSessionVars().InRestrictedSQL, e.ctx.GetSessionVars().StmtCtx)
if err != nil {
return err
}
Expand Down Expand Up @@ -763,7 +763,7 @@ func (e *AnalyzeColumnsExec) buildResp(ranges []*ranger.Range) (distsql.SelectRe
return nil, err
}
ctx := context.TODO()
result, err := distsql.Analyze(ctx, e.ctx.GetClient(), kvReq, e.ctx.GetSessionVars().KVVars, e.ctx.GetSessionVars().InRestrictedSQL, e.ctx.GetSessionVars().StmtCtx.MemTracker)
result, err := distsql.Analyze(ctx, e.ctx.GetClient(), kvReq, e.ctx.GetSessionVars().KVVars, e.ctx.GetSessionVars().InRestrictedSQL, e.ctx.GetSessionVars().StmtCtx)
if err != nil {
return nil, err
}
Expand Down
15 changes: 15 additions & 0 deletions executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9426,6 +9426,21 @@ func (s *testSerialSuite) TestIssue28650(c *C) {
}
}

// Details at https://github.com/pingcap/tidb/issues/31038
func (s *testSerialSuite) TestFix31038(c *C) {
defer config.RestoreFunc()()
config.UpdateGlobal(func(conf *config.Config) {
conf.EnableCollectExecutionInfo = false
})
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
tk.MustExec("drop table if exists t123")
tk.MustExec("create table t123 (id int);")
failpoint.Enable("github.com/pingcap/tidb/store/copr/disable-collect-execution", `return(true)`)
tk.MustQuery("select * from t123;")
failpoint.Disable("github.com/pingcap/tidb/store/copr/disable-collect-execution")
}

func (s *testSerialSuite) TestEncodingSet(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
Expand Down
3 changes: 1 addition & 2 deletions executor/simple.go
Original file line number Diff line number Diff line change
Expand Up @@ -1478,8 +1478,7 @@ func killRemoteConn(ctx context.Context, sctx sessionctx.Context, connID *util.G
if err != nil {
return err
}

resp := sctx.GetClient().Send(ctx, kvReq, sctx.GetSessionVars().KVVars, sctx.GetSessionVars().StmtCtx.MemTracker, false, nil)
resp := sctx.GetClient().Send(ctx, kvReq, sctx.GetSessionVars().KVVars, &kv.ClientSendOption{})
if resp == nil {
err := errors.New("client returns nil response")
return err
Expand Down
9 changes: 5 additions & 4 deletions infoschema/tables_serial_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -781,12 +781,13 @@ func TestStmtSummaryTable(t *testing.T) {
defer func() { require.NoError(t, failpoint.Disable(failpointName)) }()
tk.MustQuery("select * from t where a=2")

// sum_cop_task_num is always 0 if tidb_enable_collect_execution_info disabled
sql = "select stmt_type, schema_name, table_names, index_names, exec_count, sum_cop_task_num, avg_total_keys, " +
"max_total_keys, avg_processed_keys, max_processed_keys, avg_write_keys, max_write_keys, avg_prewrite_regions, " +
"max_prewrite_regions, avg_affected_rows, query_sample_text, plan " +
"from information_schema.statements_summary " +
"where digest_text like 'select * from `t`%'"
tk.MustQuery(sql).Check(testkit.Rows("Select test test.t t:k 1 2 0 0 0 0 0 0 0 0 0 select * from t where a=2 \tid \ttask \testRows\toperator info\n" +
tk.MustQuery(sql).Check(testkit.Rows("Select test test.t t:k 1 0 0 0 0 0 0 0 0 0 0 select * from t where a=2 \tid \ttask \testRows\toperator info\n" +
"\tIndexLookUp_10 \troot \t100 \t\n" +
"\t├─IndexRangeScan_8\tcop[tikv]\t100 \ttable:t, index:k(a), range:[2,2], keep order:false, stats:pseudo\n" +
"\t└─TableRowIDScan_9\tcop[tikv]\t100 \ttable:t, keep order:false, stats:pseudo"))
Expand All @@ -808,7 +809,7 @@ func TestStmtSummaryTable(t *testing.T) {
"from information_schema.statements_summary " +
"where digest_text like 'select * from `t`%'"
tk.MustQuery(sql).Check(testkit.Rows(
"Select test test.t t:k 2 4 0 0 0 0 0 0 0 0 0 select * from t where a=2 \tid \ttask \testRows\toperator info\n" +
"Select test test.t t:k 2 0 0 0 0 0 0 0 0 0 0 select * from t where a=2 \tid \ttask \testRows\toperator info\n" +
"\tIndexLookUp_10 \troot \t100 \t\n" +
"\t├─IndexRangeScan_8\tcop[tikv]\t100 \ttable:t, index:k(a), range:[2,2], keep order:false, stats:pseudo\n" +
"\t└─TableRowIDScan_9\tcop[tikv]\t100 \ttable:t, keep order:false, stats:pseudo"))
Expand Down Expand Up @@ -858,7 +859,7 @@ func TestStmtSummaryTable(t *testing.T) {
"max_prewrite_regions, avg_affected_rows, query_sample_text, plan " +
"from information_schema.statements_summary " +
"where digest_text like 'select * from `t`%'"
tk.MustQuery(sql).Check(testkit.Rows("Select test test.t t:k 1 2 0 0 0 0 0 0 0 0 0 select * from t where a=2 \tid \ttask \testRows\toperator info\n" +
tk.MustQuery(sql).Check(testkit.Rows("Select test test.t t:k 1 0 0 0 0 0 0 0 0 0 0 select * from t where a=2 \tid \ttask \testRows\toperator info\n" +
"\tIndexLookUp_10 \troot \t1000 \t\n" +
"\t├─IndexRangeScan_8\tcop[tikv]\t1000 \ttable:t, index:k(a), range:[2,2], keep order:false, stats:pseudo\n" +
"\t└─TableRowIDScan_9\tcop[tikv]\t1000 \ttable:t, keep order:false, stats:pseudo"))
Expand All @@ -877,7 +878,7 @@ func TestStmtSummaryTable(t *testing.T) {
"max_prewrite_regions, avg_affected_rows, query_sample_text, plan " +
"from information_schema.statements_summary " +
"where digest_text like 'select * from `t`%'"
tk.MustQuery(sql).Check(testkit.Rows("Select test test.t t:k 2 4 0 0 0 0 0 0 0 0 0 select * from t where a=2 \tid \ttask \testRows\toperator info\n" +
tk.MustQuery(sql).Check(testkit.Rows("Select test test.t t:k 2 0 0 0 0 0 0 0 0 0 0 select * from t where a=2 \tid \ttask \testRows\toperator info\n" +
"\tIndexLookUp_10 \troot \t1000 \t\n" +
"\t├─IndexRangeScan_8\tcop[tikv]\t1000 \ttable:t, index:k(a), range:[2,2], keep order:false, stats:pseudo\n" +
"\t└─TableRowIDScan_9\tcop[tikv]\t1000 \ttable:t, keep order:false, stats:pseudo"))
Expand Down
10 changes: 9 additions & 1 deletion kv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,12 +237,20 @@ type Transaction interface {
// Client is used to send request to KV layer.
type Client interface {
// Send sends request to KV layer, returns a Response.
Send(ctx context.Context, req *Request, vars interface{}, sessionMemTracker *memory.Tracker, enabledRateLimitAction bool, eventCb trxevents.EventCallback) Response
Send(ctx context.Context, req *Request, vars interface{}, option *ClientSendOption) Response

// IsRequestTypeSupported checks if reqType and subType is supported.
IsRequestTypeSupported(reqType, subType int64) bool
}

// ClientSendOption wraps options during Client Send
type ClientSendOption struct {
SessionMemTracker *memory.Tracker
EnabledRateLimitAction bool
EventCb trxevents.EventCallback
EnableCollectExecutionInfo bool
}

// ReqTypes.
const (
ReqTypeSelect = 101
Expand Down
47 changes: 28 additions & 19 deletions store/copr/batch_coprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -620,7 +620,7 @@ func buildBatchCopTasks(bo *backoff.Backoffer, store *kvStore, ranges *KeyRanges
}
}

func (c *CopClient) sendBatch(ctx context.Context, req *kv.Request, vars *tikv.Variables) kv.Response {
func (c *CopClient) sendBatch(ctx context.Context, req *kv.Request, vars *tikv.Variables, option *kv.ClientSendOption) kv.Response {
if req.KeepOrder || req.Desc {
return copErrorResponse{errors.New("batch coprocessor cannot prove keep order or desc property")}
}
Expand All @@ -632,11 +632,12 @@ func (c *CopClient) sendBatch(ctx context.Context, req *kv.Request, vars *tikv.V
return copErrorResponse{err}
}
it := &batchCopIterator{
store: c.store.kvStore,
req: req,
finishCh: make(chan struct{}),
vars: vars,
rpcCancel: tikv.NewRPCanceller(),
store: c.store.kvStore,
req: req,
finishCh: make(chan struct{}),
vars: vars,
rpcCancel: tikv.NewRPCanceller(),
enableCollectExecutionInfo: option.EnableCollectExecutionInfo,
}
ctx = context.WithValue(ctx, tikv.RPCCancellerCtxKey{}, it.rpcCancel)
it.tasks = tasks
Expand Down Expand Up @@ -664,6 +665,8 @@ type batchCopIterator struct {
// There are two cases we need to close the `finishCh` channel, one is when context is done, the other one is
// when the Close is called. we use atomic.CompareAndSwap `closed` to to make sure the channel is not closed twice.
closed uint32

enableCollectExecutionInfo bool
}

func (b *batchCopIterator) run(ctx context.Context) {
Expand Down Expand Up @@ -774,7 +777,7 @@ func (b *batchCopIterator) retryBatchCopTask(ctx context.Context, bo *backoff.Ba
const readTimeoutUltraLong = 3600 * time.Second // For requests that may scan many regions for tiflash.

func (b *batchCopIterator) handleTaskOnce(ctx context.Context, bo *backoff.Backoffer, task *batchCopTask) ([]*batchCopTask, error) {
sender := NewRegionBatchRequestSender(b.store.GetRegionCache(), b.store.GetTiKVClient())
sender := NewRegionBatchRequestSender(b.store.GetRegionCache(), b.store.GetTiKVClient(), b.enableCollectExecutionInfo)
var regionInfos = make([]*coprocessor.RegionInfo, 0, len(task.regionInfos))
for _, ri := range task.regionInfos {
regionInfos = append(regionInfos, &coprocessor.RegionInfo{
Expand Down Expand Up @@ -877,22 +880,13 @@ func (b *batchCopIterator) handleBatchCopResponse(bo *Backoffer, response *copro
return
}

resp := batchCopResponse{
resp := &batchCopResponse{
pbResp: response,
detail: new(CopRuntimeStats),
}

backoffTimes := bo.GetBackoffTimes()
resp.detail.BackoffTime = time.Duration(bo.GetTotalSleep()) * time.Millisecond
resp.detail.BackoffSleep = make(map[string]time.Duration, len(backoffTimes))
resp.detail.BackoffTimes = make(map[string]int, len(backoffTimes))
for backoff := range backoffTimes {
resp.detail.BackoffTimes[backoff] = backoffTimes[backoff]
resp.detail.BackoffSleep[backoff] = time.Duration(bo.GetBackoffSleepMS()[backoff]) * time.Millisecond
}
resp.detail.CalleeAddress = task.storeAddr

b.sendToRespCh(&resp)
b.handleCollectExecutionInfo(bo, resp, task)
b.sendToRespCh(resp)

return
}
Expand All @@ -905,3 +899,18 @@ func (b *batchCopIterator) sendToRespCh(resp *batchCopResponse) (exit bool) {
}
return
}

func (b *batchCopIterator) handleCollectExecutionInfo(bo *Backoffer, resp *batchCopResponse, task *batchCopTask) {
if !b.enableCollectExecutionInfo {
return
}
backoffTimes := bo.GetBackoffTimes()
resp.detail.BackoffTime = time.Duration(bo.GetTotalSleep()) * time.Millisecond
resp.detail.BackoffSleep = make(map[string]time.Duration, len(backoffTimes))
resp.detail.BackoffTimes = make(map[string]int, len(backoffTimes))
for backoff := range backoffTimes {
resp.detail.BackoffTimes[backoff] = backoffTimes[backoff]
resp.detail.BackoffSleep[backoff] = time.Duration(bo.GetBackoffSleepMS()[backoff]) * time.Millisecond
}
resp.detail.CalleeAddress = task.storeAddr
}
8 changes: 5 additions & 3 deletions store/copr/batch_request_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,14 @@ type RegionInfo struct {
// RegionBatchRequestSender sends BatchCop requests to TiFlash server by stream way.
type RegionBatchRequestSender struct {
*tikv.RegionRequestSender
enableCollectExecutionInfo bool
}

// NewRegionBatchRequestSender creates a RegionBatchRequestSender object.
func NewRegionBatchRequestSender(cache *RegionCache, client tikv.Client) *RegionBatchRequestSender {
func NewRegionBatchRequestSender(cache *RegionCache, client tikv.Client, enableCollectExecutionInfo bool) *RegionBatchRequestSender {
return &RegionBatchRequestSender{
RegionRequestSender: tikv.NewRegionRequestSender(cache.RegionCache, client),
RegionRequestSender: tikv.NewRegionRequestSender(cache.RegionCache, client),
enableCollectExecutionInfo: enableCollectExecutionInfo,
}
}

Expand All @@ -59,7 +61,7 @@ func (ss *RegionBatchRequestSender) SendReqToAddr(bo *Backoffer, rpcCtx *tikv.RP
}
start := time.Now()
resp, err = ss.GetClient().SendRequest(ctx, rpcCtx.Addr, req, timout)
if ss.Stats != nil {
if ss.Stats != nil && ss.enableCollectExecutionInfo {
tikv.RecordRegionRequestRuntimeStats(ss.Stats, req.Type, time.Since(start))
}
if err != nil {
Expand Down
Loading

0 comments on commit 200b1e0

Please sign in to comment.