From d6399ea7f93696e98baba1416cb4ad03f855cd87 Mon Sep 17 00:00:00 2001 From: you06 Date: Mon, 6 Feb 2023 12:53:28 +0800 Subject: [PATCH] record next wait time for IndexLookUp Signed-off-by: you06 --- executor/distsql.go | 29 ++++++++++++++++++++++++++++- executor/distsql_test.go | 23 +++++++++++++++-------- store/copr/coprocessor.go | 8 ++++---- 3 files changed, 47 insertions(+), 13 deletions(-) diff --git a/executor/distsql.go b/executor/distsql.go index 3b9a6a7d4b288..087e20cb7ed28 100644 --- a/executor/distsql.go +++ b/executor/distsql.go @@ -76,7 +76,9 @@ type lookupTableTask struct { idxRows *chunk.Chunk cursor int - doneCh chan error + // after the cop task is built, buildDone will be set to the current instant, for Next wait duration statistic. + buildDone time.Time + doneCh chan error // indexOrder map is used to save the original index order for the handles. // Without this map, the original index order might be lost. @@ -790,13 +792,21 @@ func (e *IndexLookUpExecutor) getResultTask() (*lookupTableTask, error) { if e.resultCurr != nil && e.resultCurr.cursor < len(e.resultCurr.rows) { return e.resultCurr, nil } + start := time.Now() task, ok := <-e.resultCh if !ok { return nil, nil } + indexFetchedInstant := time.Now() if err := <-task.doneCh; err != nil { return nil, err } + e.stats.NextWaitIndexScan += indexFetchedInstant.Sub(start) + if task.buildDone.After(indexFetchedInstant) { + e.stats.NextWaitTableLookUpBuild += task.buildDone.Sub(indexFetchedInstant) + indexFetchedInstant = task.buildDone + } + e.stats.NextWaitTableLookUpResp += time.Since(indexFetchedInstant) // Release the memory usage of last task before we handle a new task. if e.resultCurr != nil { @@ -1119,6 +1129,10 @@ type IndexLookUpRunTimeStats struct { TableRowScan int64 TableTaskNum int64 Concurrency int + // record the next wait details. + NextWaitIndexScan time.Duration + NextWaitTableLookUpBuild time.Duration + NextWaitTableLookUpResp time.Duration } func (e *IndexLookUpRunTimeStats) String() string { @@ -1142,6 +1156,15 @@ func (e *IndexLookUpRunTimeStats) String() string { } buf.WriteString(fmt.Sprintf(" table_task: {total_time: %v, num: %d, concurrency: %d}", execdetails.FormatDuration(time.Duration(tableScan)), tableTaskNum, concurrency)) } + if e.NextWaitIndexScan > 0 || e.NextWaitTableLookUpBuild > 0 || e.NextWaitTableLookUpResp > 0 { + if buf.Len() > 0 { + buf.WriteByte(',') + fmt.Fprintf(&buf, " next: {wait_index: %s, wait_table_lookup_build: %s, wait_table_lookup_resp: %s}", + execdetails.FormatDuration(e.NextWaitIndexScan), + execdetails.FormatDuration(e.NextWaitTableLookUpBuild), + execdetails.FormatDuration(e.NextWaitTableLookUpResp)) + } + } return buf.String() } @@ -1162,6 +1185,9 @@ func (e *IndexLookUpRunTimeStats) Merge(other execdetails.RuntimeStats) { e.TaskWait += tmp.TaskWait e.TableRowScan += tmp.TableRowScan e.TableTaskNum += tmp.TableTaskNum + e.NextWaitIndexScan += tmp.NextWaitIndexScan + e.NextWaitTableLookUpBuild += tmp.NextWaitTableLookUpBuild + e.NextWaitTableLookUpResp += tmp.NextWaitTableLookUpResp } // Tp implements the RuntimeStats interface. @@ -1300,6 +1326,7 @@ func getDatumRow(r *chunk.Row, fields []*types.FieldType) []types.Datum { // Then we hold the returning rows and finish this task. func (w *tableWorker) executeTask(ctx context.Context, task *lookupTableTask) error { tableReader, err := w.idxLookup.buildTableReader(ctx, task) + task.buildDone = time.Now() if err != nil { logutil.Logger(ctx).Error("build table reader failed", zap.Error(err)) return err diff --git a/executor/distsql_test.go b/executor/distsql_test.go index 65889a10d0377..50c4a311a1eb9 100644 --- a/executor/distsql_test.go +++ b/executor/distsql_test.go @@ -358,17 +358,24 @@ func TestPartitionTableRandomlyIndexLookUpReader(t *testing.T) { func TestIndexLookUpStats(t *testing.T) { stats := &executor.IndexLookUpRunTimeStats{ - FetchHandleTotal: int64(5 * time.Second), - FetchHandle: int64(2 * time.Second), - TaskWait: int64(2 * time.Second), - TableRowScan: int64(2 * time.Second), - TableTaskNum: 2, - Concurrency: 1, + FetchHandleTotal: int64(5 * time.Second), + FetchHandle: int64(2 * time.Second), + TaskWait: int64(2 * time.Second), + TableRowScan: int64(2 * time.Second), + TableTaskNum: 2, + Concurrency: 1, + NextWaitIndexScan: time.Second, + NextWaitTableLookUpBuild: 2 * time.Second, + NextWaitTableLookUpResp: 3 * time.Second, } - require.Equal(t, "index_task: {total_time: 5s, fetch_handle: 2s, build: 1s, wait: 2s}, table_task: {total_time: 2s, num: 2, concurrency: 1}", stats.String()) + require.Equal(t, "index_task: {total_time: 5s, fetch_handle: 2s, build: 1s, wait: 2s}"+ + ", table_task: {total_time: 2s, num: 2, concurrency: 1}"+ + ", next: {wait_index: 1s, wait_table_lookup_build: 2s, wait_table_lookup_resp: 3s}", stats.String()) require.Equal(t, stats.Clone().String(), stats.String()) stats.Merge(stats.Clone()) - require.Equal(t, "index_task: {total_time: 10s, fetch_handle: 4s, build: 2s, wait: 4s}, table_task: {total_time: 4s, num: 4, concurrency: 1}", stats.String()) + require.Equal(t, "index_task: {total_time: 10s, fetch_handle: 4s, build: 2s, wait: 4s}"+ + ", table_task: {total_time: 4s, num: 4, concurrency: 1}"+ + ", next: {wait_index: 2s, wait_table_lookup_build: 4s, wait_table_lookup_resp: 6s}", stats.String()) } func TestIndexLookUpGetResultChunk(t *testing.T) { diff --git a/store/copr/coprocessor.go b/store/copr/coprocessor.go index 542ac89110992..cbcc4b65894ab 100644 --- a/store/copr/coprocessor.go +++ b/store/copr/coprocessor.go @@ -345,7 +345,7 @@ func buildCopTasks(bo *Backoffer, ranges *KeyRanges, opt *buildCopTaskOpt) ([]*c } var builder taskBuilder - if req.StoreBatchSize > 0 { + if req.StoreBatchSize > 0 && hints != nil { builder = newBatchTaskBuilder(bo, req, cache) } else { builder = newLegacyTaskBuilder(len(locs)) @@ -1115,7 +1115,7 @@ func (worker *copIteratorWorker) handleTaskOnce(bo *Backoffer, task *copTask, ch Tasks: task.ToPBBatchTasks(), } - cacheKey, cacheValue := worker.buildCacheKey(task, &copReq, false) + cacheKey, cacheValue := worker.buildCacheKey(task, &copReq) req := tikvrpc.NewReplicaReadRequest(task.cmdType, &copReq, options.GetTiKVReplicaReadType(worker.req.ReplicaRead), &worker.replicaReadSeed, kvrpcpb.Context{ IsolationLevel: isolationLevelToPB(worker.req.IsolationLevel), @@ -1496,10 +1496,10 @@ func (worker *copIteratorWorker) handleLockErr(bo *Backoffer, lockErr *kvrpcpb.L return nil } -func (worker *copIteratorWorker) buildCacheKey(task *copTask, copReq *coprocessor.Request, force bool) (cacheKey []byte, cacheValue *coprCacheValue) { +func (worker *copIteratorWorker) buildCacheKey(task *copTask, copReq *coprocessor.Request) (cacheKey []byte, cacheValue *coprCacheValue) { // If there are many ranges, it is very likely to be a TableLookupRequest. They are not worth to cache since // computing is not the main cost. Ignore requests with many ranges directly to avoid slowly building the cache key. - if force || task.cmdType == tikvrpc.CmdCop && worker.store.coprCache != nil && worker.req.Cacheable && worker.store.coprCache.CheckRequestAdmission(len(copReq.Ranges)) { + if task.cmdType == tikvrpc.CmdCop && worker.store.coprCache != nil && worker.req.Cacheable && worker.store.coprCache.CheckRequestAdmission(len(copReq.Ranges)) { cKey, err := coprCacheBuildKey(copReq) if err == nil { cacheKey = cKey