diff --git a/planner/core/common_plans.go b/planner/core/common_plans.go index 39f12a5a1daec..701b33c276e60 100644 --- a/planner/core/common_plans.go +++ b/planner/core/common_plans.go @@ -869,8 +869,8 @@ func getRuntimeInfo(ctx sessionctx.Context, p Plan, runtimeStatsColl *execdetail if runtimeStatsColl != nil && runtimeStatsColl.ExistsCopStats(explainID) { copStats = runtimeStatsColl.GetCopStats(explainID) } - memTracker = ctx.GetSessionVars().StmtCtx.MemTracker.SearchTrackerWithoutLock(p.ID()) - diskTracker = ctx.GetSessionVars().StmtCtx.DiskTracker.SearchTrackerWithoutLock(p.ID()) + memTracker = ctx.GetSessionVars().StmtCtx.MemTracker.SearchTrackerWithLock(p.ID()) + diskTracker = ctx.GetSessionVars().StmtCtx.DiskTracker.SearchTrackerWithLock(p.ID()) return } diff --git a/util/execdetails/execdetails.go b/util/execdetails/execdetails.go index 2d483e167df61..4c0670e93eb97 100644 --- a/util/execdetails/execdetails.go +++ b/util/execdetails/execdetails.go @@ -32,17 +32,17 @@ import ( // ExecDetails contains execution detail information. type ExecDetails struct { - CalleeAddress string - CopTime time.Duration - BackoffTime time.Duration - LockKeysDuration time.Duration - BackoffSleep map[string]time.Duration - BackoffTimes map[string]int - RequestCount int CommitDetail *util.CommitDetails LockKeysDetail *util.LockKeysDetails ScanDetail *util.ScanDetail + BackoffSleep map[string]time.Duration + BackoffTimes map[string]int + CalleeAddress string TimeDetail util.TimeDetail + RequestCount int + CopTime time.Duration + BackoffTime time.Duration + LockKeysDuration time.Duration } type stmtExecDetailKeyType struct{} @@ -318,9 +318,9 @@ func (d ExecDetails) ToZapFields() (fields []zap.Field) { } type basicCopRuntimeStats struct { - BasicRuntimeStats - threads int32 storeType string + BasicRuntimeStats + threads int32 } // String implements the RuntimeStats interface. @@ -359,8 +359,6 @@ func (*basicCopRuntimeStats) Tp() int { // CopRuntimeStats collects cop tasks' execution info. type CopRuntimeStats struct { - sync.Mutex - // stats stores the runtime statistics of coprocessor tasks. // The key of the map is the tikv-server address. Because a tikv-server can // have many region leaders, several coprocessor tasks can be sent to the @@ -370,28 +368,30 @@ type CopRuntimeStats struct { scanDetail *util.ScanDetail // do not use kv.StoreType because it will meet cycle import error storeType string + // count CopRuntimeStats total rows + totalRows int64 + sync.Mutex } // RecordOneCopTask records a specific cop tasks's execution detail. func (crs *CopRuntimeStats) RecordOneCopTask(address string, summary *tipb.ExecutorExecutionSummary) { crs.Lock() defer crs.Unlock() + currentRows := int64(*summary.NumProducedRows) + crs.totalRows += currentRows crs.stats[address] = append(crs.stats[address], &basicCopRuntimeStats{BasicRuntimeStats: BasicRuntimeStats{loop: int32(*summary.NumIterations), consume: int64(*summary.TimeProcessedNs), - rows: int64(*summary.NumProducedRows)}, + rows: currentRows}, threads: int32(summary.GetConcurrency()), storeType: crs.storeType}) } // GetActRows return total rows of CopRuntimeStats. func (crs *CopRuntimeStats) GetActRows() (totalRows int64) { - for _, instanceStats := range crs.stats { - for _, stat := range instanceStats { - totalRows += stat.rows - } - } - return totalRows + crs.Lock() + defer crs.Unlock() + return crs.totalRows } // MergeBasicStats traverses basicCopRuntimeStats in the CopRuntimeStats and collects some useful information. @@ -635,9 +635,9 @@ func (e *BasicRuntimeStats) GetTime() int64 { // RuntimeStatsColl collects executors's execution info. type RuntimeStatsColl struct { - mu sync.Mutex rootStats map[int]*RootRuntimeStats copStats map[int]*CopRuntimeStats + mu sync.Mutex } // NewRuntimeStatsColl creates new executor collector. @@ -786,10 +786,8 @@ func NewConcurrencyInfo(name string, num int) *ConcurrencyInfo { // RuntimeStatsWithConcurrencyInfo is the BasicRuntimeStats with ConcurrencyInfo. type RuntimeStatsWithConcurrencyInfo struct { - // protect concurrency - sync.Mutex - // executor concurrency information concurrency []*ConcurrencyInfo + sync.Mutex } // Tp implements the RuntimeStats interface. @@ -840,8 +838,8 @@ func (*RuntimeStatsWithConcurrencyInfo) Merge(RuntimeStats) {} // RuntimeStatsWithCommit is the RuntimeStats with commit detail. type RuntimeStatsWithCommit struct { Commit *util.CommitDetails - TxnCnt int LockKeys *util.LockKeysDetails + TxnCnt int } // Tp implements the RuntimeStats interface. diff --git a/util/memory/tracker.go b/util/memory/tracker.go index c32af4d3af82e..9de4c721aaa8e 100644 --- a/util/memory/tracker.go +++ b/util/memory/tracker.go @@ -563,6 +563,20 @@ func (t *Tracker) SearchTrackerWithoutLock(label int) *Tracker { return nil } +// SearchTrackerWithLock searches the specific tracker under this tracker with lock. +func (t *Tracker) SearchTrackerWithLock(label int) *Tracker { + t.mu.Lock() + defer t.mu.Unlock() + if t.label == label { + return t + } + children := t.mu.children[label] + if len(children) > 0 { + return children[0] + } + return nil +} + // SearchTrackerConsumedMoreThanNBytes searches the specific tracker that consumes more than NBytes. func (t *Tracker) SearchTrackerConsumedMoreThanNBytes(limit int64) (res []*Tracker) { t.mu.Lock()