From 1176e6cfcac37451ddfafce9fe3b1dfcb9b81935 Mon Sep 17 00:00:00 2001 From: Meng Xin Date: Fri, 28 Oct 2022 18:19:21 +0800 Subject: [PATCH 1/4] fix panic cause by concurrent map iteration and write in memory usage alarm --- util/execdetails/execdetails.go | 49 +++++++++++++-------------------- 1 file changed, 19 insertions(+), 30 deletions(-) diff --git a/util/execdetails/execdetails.go b/util/execdetails/execdetails.go index 2d483e167df61..079c8f2061483 100644 --- a/util/execdetails/execdetails.go +++ b/util/execdetails/execdetails.go @@ -32,17 +32,17 @@ import ( // ExecDetails contains execution detail information. type ExecDetails struct { - CalleeAddress string - CopTime time.Duration - BackoffTime time.Duration - LockKeysDuration time.Duration - BackoffSleep map[string]time.Duration - BackoffTimes map[string]int - RequestCount int CommitDetail *util.CommitDetails LockKeysDetail *util.LockKeysDetails ScanDetail *util.ScanDetail + BackoffSleep map[string]time.Duration + BackoffTimes map[string]int + CalleeAddress string TimeDetail util.TimeDetail + RequestCount int + CopTime time.Duration + BackoffTime time.Duration + LockKeysDuration time.Duration } type stmtExecDetailKeyType struct{} @@ -318,9 +318,9 @@ func (d ExecDetails) ToZapFields() (fields []zap.Field) { } type basicCopRuntimeStats struct { - BasicRuntimeStats - threads int32 storeType string + BasicRuntimeStats + threads int32 } // String implements the RuntimeStats interface. @@ -359,39 +359,30 @@ func (*basicCopRuntimeStats) Tp() int { // CopRuntimeStats collects cop tasks' execution info. type CopRuntimeStats struct { - sync.Mutex - - // stats stores the runtime statistics of coprocessor tasks. - // The key of the map is the tikv-server address. Because a tikv-server can - // have many region leaders, several coprocessor tasks can be sent to the - // same tikv-server instance. We have to use a list to maintain all tasks - // executed on each instance. stats map[string][]*basicCopRuntimeStats scanDetail *util.ScanDetail - // do not use kv.StoreType because it will meet cycle import error - storeType string + storeType string + totalRows int64 + sync.Mutex } // RecordOneCopTask records a specific cop tasks's execution detail. func (crs *CopRuntimeStats) RecordOneCopTask(address string, summary *tipb.ExecutorExecutionSummary) { crs.Lock() defer crs.Unlock() + currentRows := int64(*summary.NumProducedRows) + crs.totalRows += currentRows crs.stats[address] = append(crs.stats[address], &basicCopRuntimeStats{BasicRuntimeStats: BasicRuntimeStats{loop: int32(*summary.NumIterations), consume: int64(*summary.TimeProcessedNs), - rows: int64(*summary.NumProducedRows)}, + rows: currentRows}, threads: int32(summary.GetConcurrency()), storeType: crs.storeType}) } // GetActRows return total rows of CopRuntimeStats. func (crs *CopRuntimeStats) GetActRows() (totalRows int64) { - for _, instanceStats := range crs.stats { - for _, stat := range instanceStats { - totalRows += stat.rows - } - } - return totalRows + return crs.totalRows } // MergeBasicStats traverses basicCopRuntimeStats in the CopRuntimeStats and collects some useful information. @@ -635,9 +626,9 @@ func (e *BasicRuntimeStats) GetTime() int64 { // RuntimeStatsColl collects executors's execution info. type RuntimeStatsColl struct { - mu sync.Mutex rootStats map[int]*RootRuntimeStats copStats map[int]*CopRuntimeStats + mu sync.Mutex } // NewRuntimeStatsColl creates new executor collector. @@ -786,10 +777,8 @@ func NewConcurrencyInfo(name string, num int) *ConcurrencyInfo { // RuntimeStatsWithConcurrencyInfo is the BasicRuntimeStats with ConcurrencyInfo. type RuntimeStatsWithConcurrencyInfo struct { - // protect concurrency - sync.Mutex - // executor concurrency information concurrency []*ConcurrencyInfo + sync.Mutex } // Tp implements the RuntimeStats interface. @@ -840,8 +829,8 @@ func (*RuntimeStatsWithConcurrencyInfo) Merge(RuntimeStats) {} // RuntimeStatsWithCommit is the RuntimeStats with commit detail. type RuntimeStatsWithCommit struct { Commit *util.CommitDetails - TxnCnt int LockKeys *util.LockKeysDetails + TxnCnt int } // Tp implements the RuntimeStats interface. From c179b0c8ad252aa6833e1dfa1285c4dcf5e972a9 Mon Sep 17 00:00:00 2001 From: Meng Xin Date: Fri, 28 Oct 2022 18:24:04 +0800 Subject: [PATCH 2/4] add comment --- util/execdetails/execdetails.go | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/util/execdetails/execdetails.go b/util/execdetails/execdetails.go index 079c8f2061483..5ebb0d8d111bc 100644 --- a/util/execdetails/execdetails.go +++ b/util/execdetails/execdetails.go @@ -359,10 +359,17 @@ func (*basicCopRuntimeStats) Tp() int { // CopRuntimeStats collects cop tasks' execution info. type CopRuntimeStats struct { + // stats stores the runtime statistics of coprocessor tasks. + // The key of the map is the tikv-server address. Because a tikv-server can + // have many region leaders, several coprocessor tasks can be sent to the + // same tikv-server instance. We have to use a list to maintain all tasks + // executed on each instance. stats map[string][]*basicCopRuntimeStats scanDetail *util.ScanDetail - storeType string - totalRows int64 + // do not use kv.StoreType because it will meet cycle import error + storeType string + // count CopRuntimeStats total rows + totalRows int64 sync.Mutex } From 46802ca296d986343f12de6fc41f0a9f993320bc Mon Sep 17 00:00:00 2001 From: Meng Xin Date: Mon, 31 Oct 2022 17:56:39 +0800 Subject: [PATCH 3/4] add lock when get act rows --- util/execdetails/execdetails.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/util/execdetails/execdetails.go b/util/execdetails/execdetails.go index 5ebb0d8d111bc..4c0670e93eb97 100644 --- a/util/execdetails/execdetails.go +++ b/util/execdetails/execdetails.go @@ -389,6 +389,8 @@ func (crs *CopRuntimeStats) RecordOneCopTask(address string, summary *tipb.Execu // GetActRows return total rows of CopRuntimeStats. func (crs *CopRuntimeStats) GetActRows() (totalRows int64) { + crs.Lock() + defer crs.Unlock() return crs.totalRows } From 6d10d95e77ae8055200beb60b07e60189de2d151 Mon Sep 17 00:00:00 2001 From: Meng Xin Date: Tue, 1 Nov 2022 11:41:45 +0800 Subject: [PATCH 4/4] add function SearchTrackerWithLock --- planner/core/common_plans.go | 4 ++-- util/memory/tracker.go | 14 ++++++++++++++ 2 files changed, 16 insertions(+), 2 deletions(-) diff --git a/planner/core/common_plans.go b/planner/core/common_plans.go index e8fb5b0a486b8..9ef62bb316dfd 100644 --- a/planner/core/common_plans.go +++ b/planner/core/common_plans.go @@ -867,8 +867,8 @@ func getRuntimeInfo(ctx sessionctx.Context, p Plan, runtimeStatsColl *execdetail if runtimeStatsColl != nil && runtimeStatsColl.ExistsCopStats(explainID) { copStats = runtimeStatsColl.GetCopStats(explainID) } - memTracker = ctx.GetSessionVars().StmtCtx.MemTracker.SearchTrackerWithoutLock(p.ID()) - diskTracker = ctx.GetSessionVars().StmtCtx.DiskTracker.SearchTrackerWithoutLock(p.ID()) + memTracker = ctx.GetSessionVars().StmtCtx.MemTracker.SearchTrackerWithLock(p.ID()) + diskTracker = ctx.GetSessionVars().StmtCtx.DiskTracker.SearchTrackerWithLock(p.ID()) return } diff --git a/util/memory/tracker.go b/util/memory/tracker.go index 0300dcb9e93fb..14ea1c75d8058 100644 --- a/util/memory/tracker.go +++ b/util/memory/tracker.go @@ -546,6 +546,20 @@ func (t *Tracker) SearchTrackerWithoutLock(label int) *Tracker { return nil } +// SearchTrackerWithLock searches the specific tracker under this tracker with lock. +func (t *Tracker) SearchTrackerWithLock(label int) *Tracker { + t.mu.Lock() + defer t.mu.Unlock() + if t.label == label { + return t + } + children := t.mu.children[label] + if len(children) > 0 { + return children[0] + } + return nil +} + // SearchTrackerConsumedMoreThanNBytes searches the specific tracker that consumes more than NBytes. func (t *Tracker) SearchTrackerConsumedMoreThanNBytes(limit int64) (res []*Tracker) { t.mu.Lock()