Skip to content

Commit

Permalink
store/copr: handle scan details & record next wait details (#41580)
Browse files Browse the repository at this point in the history
close #41582
  • Loading branch information
you06 authored Feb 23, 2023
1 parent 2150484 commit 7e0099c
Show file tree
Hide file tree
Showing 3 changed files with 83 additions and 19 deletions.
42 changes: 41 additions & 1 deletion executor/distsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,9 @@ type lookupTableTask struct {
idxRows *chunk.Chunk
cursor int

doneCh chan error
// after the cop task is built, buildDone will be set to the current instant, for Next wait duration statistic.
buildDoneTime time.Time
doneCh chan error

// indexOrder map is used to save the original index order for the handles.
// Without this map, the original index order might be lost.
Expand Down Expand Up @@ -793,13 +795,32 @@ func (e *IndexLookUpExecutor) getResultTask() (*lookupTableTask, error) {
if e.resultCurr != nil && e.resultCurr.cursor < len(e.resultCurr.rows) {
return e.resultCurr, nil
}
var (
enableStats = e.stats != nil
start time.Time
indexFetchedInstant time.Time
)
if enableStats {
start = time.Now()
}
task, ok := <-e.resultCh
if !ok {
return nil, nil
}
if enableStats {
indexFetchedInstant = time.Now()
}
if err := <-task.doneCh; err != nil {
return nil, err
}
if enableStats {
e.stats.NextWaitIndexScan += indexFetchedInstant.Sub(start)
if task.buildDoneTime.After(indexFetchedInstant) {
e.stats.NextWaitTableLookUpBuild += task.buildDoneTime.Sub(indexFetchedInstant)
indexFetchedInstant = task.buildDoneTime
}
e.stats.NextWaitTableLookUpResp += time.Since(indexFetchedInstant)
}

// Release the memory usage of last task before we handle a new task.
if e.resultCurr != nil {
Expand Down Expand Up @@ -1122,6 +1143,11 @@ type IndexLookUpRunTimeStats struct {
TableRowScan int64
TableTaskNum int64
Concurrency int

// Record the `Next` call affected wait duration details.
NextWaitIndexScan time.Duration
NextWaitTableLookUpBuild time.Duration
NextWaitTableLookUpResp time.Duration
}

func (e *IndexLookUpRunTimeStats) String() string {
Expand All @@ -1145,6 +1171,16 @@ func (e *IndexLookUpRunTimeStats) String() string {
}
buf.WriteString(fmt.Sprintf(" table_task: {total_time: %v, num: %d, concurrency: %d}", execdetails.FormatDuration(time.Duration(tableScan)), tableTaskNum, concurrency))
}

if e.NextWaitIndexScan > 0 || e.NextWaitTableLookUpBuild > 0 || e.NextWaitTableLookUpResp > 0 {
if buf.Len() > 0 {
buf.WriteByte(',')
fmt.Fprintf(&buf, " next: {wait_index: %s, wait_table_lookup_build: %s, wait_table_lookup_resp: %s}",
execdetails.FormatDuration(e.NextWaitIndexScan),
execdetails.FormatDuration(e.NextWaitTableLookUpBuild),
execdetails.FormatDuration(e.NextWaitTableLookUpResp))
}
}
return buf.String()
}

Expand All @@ -1165,6 +1201,9 @@ func (e *IndexLookUpRunTimeStats) Merge(other execdetails.RuntimeStats) {
e.TaskWait += tmp.TaskWait
e.TableRowScan += tmp.TableRowScan
e.TableTaskNum += tmp.TableTaskNum
e.NextWaitIndexScan += tmp.NextWaitIndexScan
e.NextWaitTableLookUpBuild += tmp.NextWaitTableLookUpBuild
e.NextWaitTableLookUpResp += tmp.NextWaitTableLookUpResp
}

// Tp implements the RuntimeStats interface.
Expand Down Expand Up @@ -1312,6 +1351,7 @@ func getDatumRow(r *chunk.Row, fields []*types.FieldType) []types.Datum {
// Then we hold the returning rows and finish this task.
func (w *tableWorker) executeTask(ctx context.Context, task *lookupTableTask) error {
tableReader, err := w.idxLookup.buildTableReader(ctx, task)
task.buildDoneTime = time.Now()
if err != nil {
logutil.Logger(ctx).Error("build table reader failed", zap.Error(err))
return err
Expand Down
23 changes: 15 additions & 8 deletions executor/distsql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -358,17 +358,24 @@ func TestPartitionTableRandomlyIndexLookUpReader(t *testing.T) {

func TestIndexLookUpStats(t *testing.T) {
stats := &executor.IndexLookUpRunTimeStats{
FetchHandleTotal: int64(5 * time.Second),
FetchHandle: int64(2 * time.Second),
TaskWait: int64(2 * time.Second),
TableRowScan: int64(2 * time.Second),
TableTaskNum: 2,
Concurrency: 1,
FetchHandleTotal: int64(5 * time.Second),
FetchHandle: int64(2 * time.Second),
TaskWait: int64(2 * time.Second),
TableRowScan: int64(2 * time.Second),
TableTaskNum: 2,
Concurrency: 1,
NextWaitIndexScan: time.Second,
NextWaitTableLookUpBuild: 2 * time.Second,
NextWaitTableLookUpResp: 3 * time.Second,
}
require.Equal(t, "index_task: {total_time: 5s, fetch_handle: 2s, build: 1s, wait: 2s}, table_task: {total_time: 2s, num: 2, concurrency: 1}", stats.String())
require.Equal(t, "index_task: {total_time: 5s, fetch_handle: 2s, build: 1s, wait: 2s}"+
", table_task: {total_time: 2s, num: 2, concurrency: 1}"+
", next: {wait_index: 1s, wait_table_lookup_build: 2s, wait_table_lookup_resp: 3s}", stats.String())
require.Equal(t, stats.Clone().String(), stats.String())
stats.Merge(stats.Clone())
require.Equal(t, "index_task: {total_time: 10s, fetch_handle: 4s, build: 2s, wait: 4s}, table_task: {total_time: 4s, num: 4, concurrency: 1}", stats.String())
require.Equal(t, "index_task: {total_time: 10s, fetch_handle: 4s, build: 2s, wait: 4s}"+
", table_task: {total_time: 4s, num: 4, concurrency: 1}"+
", next: {wait_index: 2s, wait_table_lookup_build: 4s, wait_table_lookup_resp: 6s}", stats.String())
}

func TestIndexLookUpGetResultChunk(t *testing.T) {
Expand Down
37 changes: 27 additions & 10 deletions store/copr/coprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,15 @@ func buildCopTasks(bo *Backoffer, cache *RegionCache, ranges *KeyRanges, req *kv
return buildTiDBMemCopTasks(ranges, req)
}

hints := req.FixedRowCountHint
rangesLen := ranges.Len()
// Since ranges from multi partitions may be pushed to one cop iterator,
// the relationship between hints and ranges is probably broken.
// But multi-partitioned ranges and hints should not exist in the same time,
// this check only guarantees there is no out-of-range use.
if len(hints) != rangesLen {
hints = nil
}

// TODO(youjiali1995): is there any request type that needn't be splitted by buckets?
locs, err := cache.SplitKeyRangesByBuckets(bo, ranges)
Expand Down Expand Up @@ -345,7 +353,7 @@ func buildCopTasks(bo *Backoffer, cache *RegionCache, ranges *KeyRanges, req *kv
nextI := mathutil.Min(i+rangesPerTask, rLen)
hint := -1
// calculate the row count hint
if req.FixedRowCountHint != nil {
if hints != nil {
startKey, endKey := loc.Ranges.At(i).StartKey, loc.Ranges.At(nextI-1).EndKey
// move to the previous range if startKey of current range is lower than endKey of previous location.
// In the following example, task1 will move origRangeIdx to region(i, z).
Expand All @@ -362,7 +370,7 @@ func buildCopTasks(bo *Backoffer, cache *RegionCache, ranges *KeyRanges, req *kv
origRangeIdx = nextOrigRangeIdx
break
}
hint += req.FixedRowCountHint[nextOrigRangeIdx]
hint += hints[nextOrigRangeIdx]
}
}
task := &copTask{
Expand Down Expand Up @@ -1160,13 +1168,13 @@ func (worker *copIteratorWorker) handleCopResponse(bo *Backoffer, rpcCtx *tikv.R
if err != nil {
return remains, err
}
return worker.handleBatchRemainsOnErr(bo, remains, resp.pbResp.BatchResponses, task, ch)
return worker.handleBatchRemainsOnErr(bo, rpcCtx, remains, resp.pbResp.BatchResponses, task, ch)
}
if lockErr := resp.pbResp.GetLocked(); lockErr != nil {
if err := worker.handleLockErr(bo, lockErr, task); err != nil {
return nil, err
}
return worker.handleBatchRemainsOnErr(bo, []*copTask{task}, resp.pbResp.BatchResponses, task, ch)
return worker.handleBatchRemainsOnErr(bo, rpcCtx, []*copTask{task}, resp.pbResp.BatchResponses, task, ch)
}
if otherErr := resp.pbResp.GetOtherError(); otherErr != "" {
err := errors.Errorf("other error: %s", otherErr)
Expand Down Expand Up @@ -1255,27 +1263,34 @@ func (worker *copIteratorWorker) handleCopResponse(bo *Backoffer, rpcCtx *tikv.R
}
batchResps := resp.pbResp.BatchResponses
worker.sendToRespCh(resp, ch, true)
return worker.handleBatchCopResponse(bo, batchResps, task.batchTaskList, ch)
return worker.handleBatchCopResponse(bo, rpcCtx, batchResps, task.batchTaskList, ch)
}

func (worker *copIteratorWorker) handleBatchRemainsOnErr(bo *Backoffer, remains []*copTask, batchResp []*coprocessor.StoreBatchTaskResponse, task *copTask, ch chan<- *copResponse) ([]*copTask, error) {
func (worker *copIteratorWorker) handleBatchRemainsOnErr(bo *Backoffer, rpcCtx *tikv.RPCContext, remains []*copTask, batchResp []*coprocessor.StoreBatchTaskResponse, task *copTask, ch chan<- *copResponse) ([]*copTask, error) {
if len(task.batchTaskList) == 0 {
return remains, nil
}
batchedTasks := task.batchTaskList
task.batchTaskList = nil
batchedRemains, err := worker.handleBatchCopResponse(bo, batchResp, batchedTasks, ch)
batchedRemains, err := worker.handleBatchCopResponse(bo, rpcCtx, batchResp, batchedTasks, ch)
if err != nil {
return nil, err
}
return append(remains, batchedRemains...), nil
}

// handle the batched cop response.
func (worker *copIteratorWorker) handleBatchCopResponse(bo *Backoffer, batchResps []*coprocessor.StoreBatchTaskResponse, tasks map[uint64]*batchedCopTask, ch chan<- *copResponse) ([]*copTask, error) {
func (worker *copIteratorWorker) handleBatchCopResponse(bo *Backoffer, rpcCtx *tikv.RPCContext, batchResps []*coprocessor.StoreBatchTaskResponse, tasks map[uint64]*batchedCopTask, ch chan<- *copResponse) ([]*copTask, error) {
if len(tasks) == 0 {
return nil, nil
}
// need Addr for recording details.
var dummyRPCCtx *tikv.RPCContext
if rpcCtx != nil {
dummyRPCCtx = &tikv.RPCContext{
Addr: rpcCtx.Addr,
}
}
var remainTasks []*copTask
for _, batchResp := range batchResps {
batchedTask, ok := tasks[batchResp.GetTaskId()]
Expand All @@ -1284,7 +1299,8 @@ func (worker *copIteratorWorker) handleBatchCopResponse(bo *Backoffer, batchResp
}
resp := &copResponse{
pbResp: &coprocessor.Response{
Data: batchResp.Data,
Data: batchResp.Data,
ExecDetailsV2: batchResp.ExecDetailsV2,
},
}
task := batchedTask.task
Expand Down Expand Up @@ -1331,8 +1347,9 @@ func (worker *copIteratorWorker) handleBatchCopResponse(bo *Backoffer, batchResp
}
return nil, errors.Trace(err)
}
worker.handleCollectExecutionInfo(bo, dummyRPCCtx, resp)
// TODO: check OOM
worker.sendToRespCh(resp, ch, false)
worker.sendToRespCh(resp, ch, true)
}
return remainTasks, nil
}
Expand Down

0 comments on commit 7e0099c

Please sign in to comment.