Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

store/copr: enable store batch by default & add extra copr concurrency and batch nums in stats #40711

Merged
merged 25 commits into from
Feb 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
3578227
add copr concurrency status
you06 Jan 18, 2023
d6399ea
record next wait time for IndexLookUp
you06 Feb 6, 2023
c58dcf4
Merge branch 'master' into copr/conc-details
you06 Feb 6, 2023
733b551
record duration when stats exist
you06 Feb 6, 2023
055fa93
Merge branch 'master' into copr/conc-details
you06 Feb 6, 2023
91c283c
update
you06 Feb 6, 2023
b9fc171
Merge branch 'master' into copr/conc-details
you06 Feb 7, 2023
1a530e6
Update executor/distsql.go
you06 Feb 7, 2023
0eb90b3
Update executor/distsql.go
you06 Feb 7, 2023
03265ae
update field name
you06 Feb 7, 2023
bd2607c
Merge branch 'master' into copr/conc-details
you06 Feb 7, 2023
a9cd7de
add telemetry
you06 Feb 7, 2023
b4bcb2b
Merge branch 'master' into copr/conc-details
you06 Feb 7, 2023
8591a8b
add telemetry of batched tasks
you06 Feb 8, 2023
b1e0914
update bazel
you06 Feb 8, 2023
e492b66
Merge branch 'master' into copr/conc-details
you06 Feb 8, 2023
0eed285
add comment & only batch small tasks
you06 Feb 8, 2023
b1d0986
Merge branch 'master' into copr/conc-details
you06 Feb 8, 2023
8b578b7
Merge branch 'master' into copr/conc-details
ti-chi-bot Feb 8, 2023
e076493
Merge branch 'master' into copr/conc-details
ti-chi-bot Feb 8, 2023
7311c98
check diff to avoid affected by other tests
you06 Feb 8, 2023
a84c5db
fix runtime stats test
you06 Feb 8, 2023
8ffb40c
Merge branch 'master' into copr/conc-details
you06 Feb 8, 2023
8869b4d
Merge branch 'master' into copr/conc-details
ti-chi-bot Feb 8, 2023
57f6ee6
Merge branch 'master' into copr/conc-details
ti-chi-bot Feb 8, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions distsql/distsql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ func TestSelectResultRuntimeStats(t *testing.T) {
stmtStats.RegisterStats(1, s1)
stmtStats.RegisterStats(1, &s2)
stats := stmtStats.GetRootStats(1)
expect := "time:1s, loops:1, cop_task: {num: 4, max: 1s, min: 1ms, avg: 500.5ms, p95: 1s, max_proc_keys: 200, p95_proc_keys: 200, tot_proc: 2s, tot_wait: 2s, copr_cache_hit_ratio: 0.00, distsql_concurrency: 15}, backoff{RegionMiss: 2ms}"
expect := "time:1s, loops:1, cop_task: {num: 4, max: 1s, min: 1ms, avg: 500.5ms, p95: 1s, max_proc_keys: 200, p95_proc_keys: 200, tot_proc: 2s, tot_wait: 2s, copr_cache_hit_ratio: 0.00, max_distsql_concurrency: 15}, backoff{RegionMiss: 2ms}"
require.Equal(t, expect, stats.String())
// Test for idempotence.
require.Equal(t, expect, stats.String())
Expand All @@ -135,7 +135,7 @@ func TestSelectResultRuntimeStats(t *testing.T) {
}
stmtStats.RegisterStats(2, s1)
stats = stmtStats.GetRootStats(2)
expect = "cop_task: {num: 2, max: 1s, min: 1ms, avg: 500.5ms, p95: 1s, max_proc_keys: 200, p95_proc_keys: 200, tot_proc: 1s, tot_wait: 1s, rpc_num: 1, rpc_time: 1s, copr_cache_hit_ratio: 0.00, distsql_concurrency: 15}, backoff{RegionMiss: 1ms}"
expect = "cop_task: {num: 2, max: 1s, min: 1ms, avg: 500.5ms, p95: 1s, max_proc_keys: 200, p95_proc_keys: 200, tot_proc: 1s, tot_wait: 1s, rpc_num: 1, rpc_time: 1s, copr_cache_hit_ratio: 0.00, max_distsql_concurrency: 15}, backoff{RegionMiss: 1ms}"
require.Equal(t, expect, stats.String())
// Test for idempotence.
require.Equal(t, expect, stats.String())
Expand Down
106 changes: 87 additions & 19 deletions distsql/select_result.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,12 @@ var (
errQueryInterrupted = dbterror.ClassExecutor.NewStd(errno.ErrQueryInterrupted)
)

var (
telemetryBatchedQueryTaskCnt = metrics.TelemetryBatchedQueryTaskCnt
telemetryStoreBatchedCnt = metrics.TelemetryStoreBatchedCnt
you06 marked this conversation as resolved.
Show resolved Hide resolved
telemetryStoreBatchedFallbackCnt = metrics.TelemetryStoreBatchedFallbackCnt
)

var (
_ SelectResult = (*selectResult)(nil)
_ SelectResult = (*serialSelectResults)(nil)
Expand Down Expand Up @@ -157,7 +163,7 @@ func (r *selectResult) fetchResp(ctx context.Context) error {
if r.stats != nil {
// Ignore internal sql.
if !r.ctx.GetSessionVars().InRestrictedSQL && len(r.stats.copRespTime) > 0 {
ratio := float64(r.stats.CoprCacheHitNum) / float64(len(r.stats.copRespTime))
ratio := r.stats.calcCacheHit()
if ratio >= 1 {
telemetry.CurrentCoprCacheHitRatioGTE100Count.Inc()
}
Expand Down Expand Up @@ -364,6 +370,11 @@ func (r *selectResult) updateCopRuntimeStats(ctx context.Context, copStats *copr
rpcStat: tikv.NewRegionRequestRuntimeStats(),
distSQLConcurrency: r.distSQLConcurrency,
}
if ci, ok := r.resp.(copr.CopInfo); ok {
conc, extraConc := ci.GetConcurrency()
r.stats.distSQLConcurrency = conc
r.stats.extraConcurrency = extraConc
}
}
r.stats.mergeCopRuntimeStats(copStats, respTime)

Expand Down Expand Up @@ -455,26 +466,42 @@ func (r *selectResult) Close() error {
r.memConsume(-respSize)
}
if r.stats != nil {
defer r.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(r.rootPlanID, r.stats)
defer func() {
if ci, ok := r.resp.(copr.CopInfo); ok {
r.stats.buildTaskDuration = ci.GetBuildTaskElapsed()
batched, fallback := ci.GetStoreBatchInfo()
if batched != 0 || fallback != 0 {
r.stats.storeBatchedNum, r.stats.storeBatchedFallbackNum = batched, fallback
telemetryStoreBatchedCnt.Add(float64(r.stats.storeBatchedNum))
telemetryStoreBatchedFallbackCnt.Add(float64(r.stats.storeBatchedFallbackNum))
telemetryBatchedQueryTaskCnt.Add(float64(len(r.stats.copRespTime)))
}
}
r.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(r.rootPlanID, r.stats)
}()
}
return r.resp.Close()
}

// CopRuntimeStats is a interface uses to check whether the result has cop runtime stats.
// CopRuntimeStats is an interface uses to check whether the result has cop runtime stats.
type CopRuntimeStats interface {
// GetCopRuntimeStats gets the cop runtime stats information.
GetCopRuntimeStats() *copr.CopRuntimeStats
}

type selectResultRuntimeStats struct {
copRespTime []time.Duration
procKeys []int64
backoffSleep map[string]time.Duration
totalProcessTime time.Duration
totalWaitTime time.Duration
rpcStat tikv.RegionRequestRuntimeStats
distSQLConcurrency int
CoprCacheHitNum int64
copRespTime []time.Duration
procKeys []int64
backoffSleep map[string]time.Duration
totalProcessTime time.Duration
totalWaitTime time.Duration
rpcStat tikv.RegionRequestRuntimeStats
distSQLConcurrency int
extraConcurrency int
CoprCacheHitNum int64
storeBatchedNum uint64
storeBatchedFallbackNum uint64
buildTaskDuration time.Duration
}

func (s *selectResultRuntimeStats) mergeCopRuntimeStats(copStats *copr.CopRuntimeStats, respTime time.Duration) {
Expand All @@ -495,12 +522,16 @@ func (s *selectResultRuntimeStats) mergeCopRuntimeStats(copStats *copr.CopRuntim

func (s *selectResultRuntimeStats) Clone() execdetails.RuntimeStats {
newRs := selectResultRuntimeStats{
copRespTime: make([]time.Duration, 0, len(s.copRespTime)),
procKeys: make([]int64, 0, len(s.procKeys)),
backoffSleep: make(map[string]time.Duration, len(s.backoffSleep)),
rpcStat: tikv.NewRegionRequestRuntimeStats(),
distSQLConcurrency: s.distSQLConcurrency,
CoprCacheHitNum: s.CoprCacheHitNum,
copRespTime: make([]time.Duration, 0, len(s.copRespTime)),
procKeys: make([]int64, 0, len(s.procKeys)),
backoffSleep: make(map[string]time.Duration, len(s.backoffSleep)),
rpcStat: tikv.NewRegionRequestRuntimeStats(),
distSQLConcurrency: s.distSQLConcurrency,
extraConcurrency: s.extraConcurrency,
CoprCacheHitNum: s.CoprCacheHitNum,
storeBatchedNum: s.storeBatchedNum,
storeBatchedFallbackNum: s.storeBatchedFallbackNum,
buildTaskDuration: s.buildTaskDuration,
}
newRs.copRespTime = append(newRs.copRespTime, s.copRespTime...)
newRs.procKeys = append(newRs.procKeys, s.procKeys...)
Expand Down Expand Up @@ -528,6 +559,15 @@ func (s *selectResultRuntimeStats) Merge(rs execdetails.RuntimeStats) {
s.totalWaitTime += other.totalWaitTime
s.rpcStat.Merge(other.rpcStat)
s.CoprCacheHitNum += other.CoprCacheHitNum
if other.distSQLConcurrency > s.distSQLConcurrency {
s.distSQLConcurrency = other.distSQLConcurrency
}
if other.extraConcurrency > s.extraConcurrency {
s.extraConcurrency = other.extraConcurrency
}
s.storeBatchedNum += other.storeBatchedNum
s.storeBatchedFallbackNum += other.storeBatchedFallbackNum
s.buildTaskDuration += other.buildTaskDuration
}

func (s *selectResultRuntimeStats) String() string {
Expand Down Expand Up @@ -579,14 +619,30 @@ func (s *selectResultRuntimeStats) String() string {
}
if config.GetGlobalConfig().TiKVClient.CoprCache.CapacityMB > 0 {
buf.WriteString(fmt.Sprintf(", copr_cache_hit_ratio: %v",
strconv.FormatFloat(float64(s.CoprCacheHitNum)/float64(len(s.copRespTime)), 'f', 2, 64)))
strconv.FormatFloat(s.calcCacheHit(), 'f', 2, 64)))
} else {
buf.WriteString(", copr_cache: disabled")
}
if s.buildTaskDuration > 0 {
buf.WriteString(", build_task_duration: ")
buf.WriteString(execdetails.FormatDuration(s.buildTaskDuration))
}
if s.distSQLConcurrency > 0 {
buf.WriteString(", distsql_concurrency: ")
buf.WriteString(", max_distsql_concurrency: ")
buf.WriteString(strconv.FormatInt(int64(s.distSQLConcurrency), 10))
}
if s.extraConcurrency > 0 {
buf.WriteString(", max_extra_concurrency: ")
buf.WriteString(strconv.FormatInt(int64(s.extraConcurrency), 10))
}
if s.storeBatchedNum > 0 {
buf.WriteString(", store_batch_num: ")
buf.WriteString(strconv.FormatInt(int64(s.storeBatchedNum), 10))
}
if s.storeBatchedFallbackNum > 0 {
buf.WriteString(", store_batch_fallback_num: ")
buf.WriteString(strconv.FormatInt(int64(s.storeBatchedFallbackNum), 10))
}
buf.WriteString("}")
}

Expand Down Expand Up @@ -615,3 +671,15 @@ func (s *selectResultRuntimeStats) String() string {
func (*selectResultRuntimeStats) Tp() int {
return execdetails.TpSelectResultRuntimeStats
}

func (s *selectResultRuntimeStats) calcCacheHit() float64 {
hit := s.CoprCacheHitNum
tot := len(s.copRespTime)
if s.storeBatchedNum > 0 {
tot += int(s.storeBatchedNum)
}
if tot == 0 {
return 0
}
return float64(hit) / float64(tot)
}
1 change: 1 addition & 0 deletions executor/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,7 @@ type TelemetryInfo struct {
PartitionTelemetry *PartitionTelemetryInfo
AccountLockTelemetry *AccountLockTelemetryInfo
UseIndexMerge bool
UseTableLookUp bool
}

// PartitionTelemetryInfo records table partition telemetry information during execution.
Expand Down
7 changes: 7 additions & 0 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -3863,6 +3863,9 @@ func buildNoRangeIndexLookUpReader(b *executorBuilder, v *plannercore.PhysicalIn
}

func (b *executorBuilder) buildIndexLookUpReader(v *plannercore.PhysicalIndexLookUpReader) Executor {
if b.Ti != nil {
b.Ti.UseTableLookUp = true
}
is := v.IndexPlans[0].(*plannercore.PhysicalIndexScan)
if err := b.validCanReadTemporaryOrCacheTable(is.Table); err != nil {
b.err = err
Expand Down Expand Up @@ -4000,6 +4003,7 @@ func buildNoRangeIndexMergeReader(b *executorBuilder, v *plannercore.PhysicalInd
func (b *executorBuilder) buildIndexMergeReader(v *plannercore.PhysicalIndexMergeReader) Executor {
if b.Ti != nil {
b.Ti.UseIndexMerge = true
b.Ti.UseTableLookUp = true
}
ts := v.TablePlans[0].(*plannercore.PhysicalTableScan)
if err := b.validCanReadTemporaryOrCacheTable(ts.Table); err != nil {
Expand Down Expand Up @@ -4445,6 +4449,9 @@ func (builder *dataReaderBuilder) buildIndexReaderForIndexJoin(ctx context.Conte

func (builder *dataReaderBuilder) buildIndexLookUpReaderForIndexJoin(ctx context.Context, v *plannercore.PhysicalIndexLookUpReader,
lookUpContents []*indexJoinLookUpContent, indexRanges []*ranger.Range, keyOff2IdxOff []int, cwc *plannercore.ColWithCmpFuncManager, memTracker *memory.Tracker, interruptSignal *atomic.Value) (Executor, error) {
if builder.Ti != nil {
builder.Ti.UseTableLookUp = true
}
e, err := buildNoRangeIndexLookUpReader(builder.executorBuilder, v)
if err != nil {
return nil, err
Expand Down
40 changes: 39 additions & 1 deletion executor/distsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,9 @@ type lookupTableTask struct {
idxRows *chunk.Chunk
cursor int

doneCh chan error
// after the cop task is built, buildDone will be set to the current instant, for Next wait duration statistic.
buildDoneTime time.Time
doneCh chan error

// indexOrder map is used to save the original index order for the handles.
// Without this map, the original index order might be lost.
Expand Down Expand Up @@ -790,13 +792,32 @@ func (e *IndexLookUpExecutor) getResultTask() (*lookupTableTask, error) {
if e.resultCurr != nil && e.resultCurr.cursor < len(e.resultCurr.rows) {
return e.resultCurr, nil
}
var (
enableStats = e.stats != nil
start time.Time
indexFetchedInstant time.Time
)
if enableStats {
start = time.Now()
}
task, ok := <-e.resultCh
if !ok {
return nil, nil
}
if enableStats {
indexFetchedInstant = time.Now()
}
if err := <-task.doneCh; err != nil {
return nil, err
}
if enableStats {
e.stats.NextWaitIndexScan += indexFetchedInstant.Sub(start)
if task.buildDoneTime.After(indexFetchedInstant) {
e.stats.NextWaitTableLookUpBuild += task.buildDoneTime.Sub(indexFetchedInstant)
indexFetchedInstant = task.buildDoneTime
}
e.stats.NextWaitTableLookUpResp += time.Since(indexFetchedInstant)
}

// Release the memory usage of last task before we handle a new task.
if e.resultCurr != nil {
Expand Down Expand Up @@ -1119,6 +1140,10 @@ type IndexLookUpRunTimeStats struct {
TableRowScan int64
TableTaskNum int64
Concurrency int
// Record the `Next` call affected wait duration details.
NextWaitIndexScan time.Duration
NextWaitTableLookUpBuild time.Duration
NextWaitTableLookUpResp time.Duration
}

func (e *IndexLookUpRunTimeStats) String() string {
Expand All @@ -1142,6 +1167,15 @@ func (e *IndexLookUpRunTimeStats) String() string {
}
buf.WriteString(fmt.Sprintf(" table_task: {total_time: %v, num: %d, concurrency: %d}", execdetails.FormatDuration(time.Duration(tableScan)), tableTaskNum, concurrency))
}
if e.NextWaitIndexScan > 0 || e.NextWaitTableLookUpBuild > 0 || e.NextWaitTableLookUpResp > 0 {
if buf.Len() > 0 {
buf.WriteByte(',')
fmt.Fprintf(&buf, " next: {wait_index: %s, wait_table_lookup_build: %s, wait_table_lookup_resp: %s}",
execdetails.FormatDuration(e.NextWaitIndexScan),
execdetails.FormatDuration(e.NextWaitTableLookUpBuild),
execdetails.FormatDuration(e.NextWaitTableLookUpResp))
}
}
return buf.String()
}

Expand All @@ -1162,6 +1196,9 @@ func (e *IndexLookUpRunTimeStats) Merge(other execdetails.RuntimeStats) {
e.TaskWait += tmp.TaskWait
e.TableRowScan += tmp.TableRowScan
e.TableTaskNum += tmp.TableTaskNum
e.NextWaitIndexScan += tmp.NextWaitIndexScan
e.NextWaitTableLookUpBuild += tmp.NextWaitTableLookUpBuild
e.NextWaitTableLookUpResp += tmp.NextWaitTableLookUpResp
}

// Tp implements the RuntimeStats interface.
Expand Down Expand Up @@ -1300,6 +1337,7 @@ func getDatumRow(r *chunk.Row, fields []*types.FieldType) []types.Datum {
// Then we hold the returning rows and finish this task.
func (w *tableWorker) executeTask(ctx context.Context, task *lookupTableTask) error {
tableReader, err := w.idxLookup.buildTableReader(ctx, task)
task.buildDoneTime = time.Now()
if err != nil {
logutil.Logger(ctx).Error("build table reader failed", zap.Error(err))
return err
Expand Down
23 changes: 15 additions & 8 deletions executor/distsql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -358,17 +358,24 @@ func TestPartitionTableRandomlyIndexLookUpReader(t *testing.T) {

func TestIndexLookUpStats(t *testing.T) {
stats := &executor.IndexLookUpRunTimeStats{
FetchHandleTotal: int64(5 * time.Second),
FetchHandle: int64(2 * time.Second),
TaskWait: int64(2 * time.Second),
TableRowScan: int64(2 * time.Second),
TableTaskNum: 2,
Concurrency: 1,
FetchHandleTotal: int64(5 * time.Second),
FetchHandle: int64(2 * time.Second),
TaskWait: int64(2 * time.Second),
TableRowScan: int64(2 * time.Second),
TableTaskNum: 2,
Concurrency: 1,
NextWaitIndexScan: time.Second,
NextWaitTableLookUpBuild: 2 * time.Second,
NextWaitTableLookUpResp: 3 * time.Second,
}
require.Equal(t, "index_task: {total_time: 5s, fetch_handle: 2s, build: 1s, wait: 2s}, table_task: {total_time: 2s, num: 2, concurrency: 1}", stats.String())
require.Equal(t, "index_task: {total_time: 5s, fetch_handle: 2s, build: 1s, wait: 2s}"+
", table_task: {total_time: 2s, num: 2, concurrency: 1}"+
", next: {wait_index: 1s, wait_table_lookup_build: 2s, wait_table_lookup_resp: 3s}", stats.String())
require.Equal(t, stats.Clone().String(), stats.String())
stats.Merge(stats.Clone())
require.Equal(t, "index_task: {total_time: 10s, fetch_handle: 4s, build: 2s, wait: 4s}, table_task: {total_time: 4s, num: 4, concurrency: 1}", stats.String())
require.Equal(t, "index_task: {total_time: 10s, fetch_handle: 4s, build: 2s, wait: 4s}"+
", table_task: {total_time: 4s, num: 4, concurrency: 1}"+
", next: {wait_index: 2s, wait_table_lookup_build: 4s, wait_table_lookup_resp: 6s}", stats.String())
}

func TestIndexLookUpGetResultChunk(t *testing.T) {
Expand Down
Loading