Skip to content

Commit

Permalink
record next wait time for IndexLookUp
Browse files Browse the repository at this point in the history
Signed-off-by: you06 <you1474600@gmail.com>
  • Loading branch information
you06 committed Feb 6, 2023
1 parent 3578227 commit d6399ea
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 13 deletions.
29 changes: 28 additions & 1 deletion executor/distsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,9 @@ type lookupTableTask struct {
idxRows *chunk.Chunk
cursor int

doneCh chan error
// after the cop task is built, buildDone will be set to the current instant, for Next wait duration statistic.
buildDone time.Time
doneCh chan error

// indexOrder map is used to save the original index order for the handles.
// Without this map, the original index order might be lost.
Expand Down Expand Up @@ -790,13 +792,21 @@ func (e *IndexLookUpExecutor) getResultTask() (*lookupTableTask, error) {
if e.resultCurr != nil && e.resultCurr.cursor < len(e.resultCurr.rows) {
return e.resultCurr, nil
}
start := time.Now()
task, ok := <-e.resultCh
if !ok {
return nil, nil
}
indexFetchedInstant := time.Now()
if err := <-task.doneCh; err != nil {
return nil, err
}
e.stats.NextWaitIndexScan += indexFetchedInstant.Sub(start)
if task.buildDone.After(indexFetchedInstant) {
e.stats.NextWaitTableLookUpBuild += task.buildDone.Sub(indexFetchedInstant)
indexFetchedInstant = task.buildDone
}
e.stats.NextWaitTableLookUpResp += time.Since(indexFetchedInstant)

// Release the memory usage of last task before we handle a new task.
if e.resultCurr != nil {
Expand Down Expand Up @@ -1119,6 +1129,10 @@ type IndexLookUpRunTimeStats struct {
TableRowScan int64
TableTaskNum int64
Concurrency int
// record the next wait details.
NextWaitIndexScan time.Duration
NextWaitTableLookUpBuild time.Duration
NextWaitTableLookUpResp time.Duration
}

func (e *IndexLookUpRunTimeStats) String() string {
Expand All @@ -1142,6 +1156,15 @@ func (e *IndexLookUpRunTimeStats) String() string {
}
buf.WriteString(fmt.Sprintf(" table_task: {total_time: %v, num: %d, concurrency: %d}", execdetails.FormatDuration(time.Duration(tableScan)), tableTaskNum, concurrency))
}
if e.NextWaitIndexScan > 0 || e.NextWaitTableLookUpBuild > 0 || e.NextWaitTableLookUpResp > 0 {
if buf.Len() > 0 {
buf.WriteByte(',')
fmt.Fprintf(&buf, " next: {wait_index: %s, wait_table_lookup_build: %s, wait_table_lookup_resp: %s}",
execdetails.FormatDuration(e.NextWaitIndexScan),
execdetails.FormatDuration(e.NextWaitTableLookUpBuild),
execdetails.FormatDuration(e.NextWaitTableLookUpResp))
}
}
return buf.String()
}

Expand All @@ -1162,6 +1185,9 @@ func (e *IndexLookUpRunTimeStats) Merge(other execdetails.RuntimeStats) {
e.TaskWait += tmp.TaskWait
e.TableRowScan += tmp.TableRowScan
e.TableTaskNum += tmp.TableTaskNum
e.NextWaitIndexScan += tmp.NextWaitIndexScan
e.NextWaitTableLookUpBuild += tmp.NextWaitTableLookUpBuild
e.NextWaitTableLookUpResp += tmp.NextWaitTableLookUpResp
}

// Tp implements the RuntimeStats interface.
Expand Down Expand Up @@ -1300,6 +1326,7 @@ func getDatumRow(r *chunk.Row, fields []*types.FieldType) []types.Datum {
// Then we hold the returning rows and finish this task.
func (w *tableWorker) executeTask(ctx context.Context, task *lookupTableTask) error {
tableReader, err := w.idxLookup.buildTableReader(ctx, task)
task.buildDone = time.Now()
if err != nil {
logutil.Logger(ctx).Error("build table reader failed", zap.Error(err))
return err
Expand Down
23 changes: 15 additions & 8 deletions executor/distsql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -358,17 +358,24 @@ func TestPartitionTableRandomlyIndexLookUpReader(t *testing.T) {

func TestIndexLookUpStats(t *testing.T) {
stats := &executor.IndexLookUpRunTimeStats{
FetchHandleTotal: int64(5 * time.Second),
FetchHandle: int64(2 * time.Second),
TaskWait: int64(2 * time.Second),
TableRowScan: int64(2 * time.Second),
TableTaskNum: 2,
Concurrency: 1,
FetchHandleTotal: int64(5 * time.Second),
FetchHandle: int64(2 * time.Second),
TaskWait: int64(2 * time.Second),
TableRowScan: int64(2 * time.Second),
TableTaskNum: 2,
Concurrency: 1,
NextWaitIndexScan: time.Second,
NextWaitTableLookUpBuild: 2 * time.Second,
NextWaitTableLookUpResp: 3 * time.Second,
}
require.Equal(t, "index_task: {total_time: 5s, fetch_handle: 2s, build: 1s, wait: 2s}, table_task: {total_time: 2s, num: 2, concurrency: 1}", stats.String())
require.Equal(t, "index_task: {total_time: 5s, fetch_handle: 2s, build: 1s, wait: 2s}"+
", table_task: {total_time: 2s, num: 2, concurrency: 1}"+
", next: {wait_index: 1s, wait_table_lookup_build: 2s, wait_table_lookup_resp: 3s}", stats.String())
require.Equal(t, stats.Clone().String(), stats.String())
stats.Merge(stats.Clone())
require.Equal(t, "index_task: {total_time: 10s, fetch_handle: 4s, build: 2s, wait: 4s}, table_task: {total_time: 4s, num: 4, concurrency: 1}", stats.String())
require.Equal(t, "index_task: {total_time: 10s, fetch_handle: 4s, build: 2s, wait: 4s}"+
", table_task: {total_time: 4s, num: 4, concurrency: 1}"+
", next: {wait_index: 2s, wait_table_lookup_build: 4s, wait_table_lookup_resp: 6s}", stats.String())
}

func TestIndexLookUpGetResultChunk(t *testing.T) {
Expand Down
8 changes: 4 additions & 4 deletions store/copr/coprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,7 @@ func buildCopTasks(bo *Backoffer, ranges *KeyRanges, opt *buildCopTaskOpt) ([]*c
}

var builder taskBuilder
if req.StoreBatchSize > 0 {
if req.StoreBatchSize > 0 && hints != nil {
builder = newBatchTaskBuilder(bo, req, cache)
} else {
builder = newLegacyTaskBuilder(len(locs))
Expand Down Expand Up @@ -1115,7 +1115,7 @@ func (worker *copIteratorWorker) handleTaskOnce(bo *Backoffer, task *copTask, ch
Tasks: task.ToPBBatchTasks(),
}

cacheKey, cacheValue := worker.buildCacheKey(task, &copReq, false)
cacheKey, cacheValue := worker.buildCacheKey(task, &copReq)

req := tikvrpc.NewReplicaReadRequest(task.cmdType, &copReq, options.GetTiKVReplicaReadType(worker.req.ReplicaRead), &worker.replicaReadSeed, kvrpcpb.Context{
IsolationLevel: isolationLevelToPB(worker.req.IsolationLevel),
Expand Down Expand Up @@ -1496,10 +1496,10 @@ func (worker *copIteratorWorker) handleLockErr(bo *Backoffer, lockErr *kvrpcpb.L
return nil
}

func (worker *copIteratorWorker) buildCacheKey(task *copTask, copReq *coprocessor.Request, force bool) (cacheKey []byte, cacheValue *coprCacheValue) {
func (worker *copIteratorWorker) buildCacheKey(task *copTask, copReq *coprocessor.Request) (cacheKey []byte, cacheValue *coprCacheValue) {
// If there are many ranges, it is very likely to be a TableLookupRequest. They are not worth to cache since
// computing is not the main cost. Ignore requests with many ranges directly to avoid slowly building the cache key.
if force || task.cmdType == tikvrpc.CmdCop && worker.store.coprCache != nil && worker.req.Cacheable && worker.store.coprCache.CheckRequestAdmission(len(copReq.Ranges)) {
if task.cmdType == tikvrpc.CmdCop && worker.store.coprCache != nil && worker.req.Cacheable && worker.store.coprCache.CheckRequestAdmission(len(copReq.Ranges)) {
cKey, err := coprCacheBuildKey(copReq)
if err == nil {
cacheKey = cKey
Expand Down

0 comments on commit d6399ea

Please sign in to comment.