From 4f9670c76611fd6a845637c2c132a402e2e69f55 Mon Sep 17 00:00:00 2001 From: HuaiyuXu <391585975@qq.com> Date: Mon, 28 Nov 2022 17:02:01 +0800 Subject: [PATCH 1/2] This is an automated cherry-pick of #39368 Signed-off-by: ti-chi-bot --- session/session.go | 22 + sessionctx/stmtctx/stmtctx.go | 76 ++++ sessionctx/variable/session.go | 19 + util/expensivequery/expensivequery.go | 9 + util/memoryusagealarm/memoryusagealarm.go | 380 ++++++++++++++++++ .../memoryusagealarm/memoryusagealarm_test.go | 145 +++++++ util/processinfo.go | 21 + util/util.go | 100 +++++ util/util_test.go | 74 ++++ 9 files changed, 846 insertions(+) create mode 100644 util/memoryusagealarm/memoryusagealarm.go create mode 100644 util/memoryusagealarm/memoryusagealarm_test.go create mode 100644 util/util_test.go diff --git a/session/session.go b/session/session.go index 91b9c97ff765a..b2ccbfcf3d1a1 100644 --- a/session/session.go +++ b/session/session.go @@ -1394,6 +1394,7 @@ func (s *session) SetProcessInfo(sql string, t time.Time, command byte, maxExecu p = explain.TargetPlan } pi := util.ProcessInfo{ +<<<<<<< HEAD ID: s.sessionVars.ConnectionID, Port: s.sessionVars.Port, DB: s.sessionVars.CurrentDB, @@ -1409,6 +1410,27 @@ func (s *session) SetProcessInfo(sql string, t time.Time, command byte, maxExecu StatsInfo: plannercore.GetStatsInfo, MaxExecutionTime: maxExecutionTime, RedactSQL: s.sessionVars.EnableRedactLog, +======= + ID: s.sessionVars.ConnectionID, + Port: s.sessionVars.Port, + DB: s.sessionVars.CurrentDB, + Command: command, + Plan: p, + PlanExplainRows: plannercore.GetExplainRowsForPlan(p), + RuntimeStatsColl: s.sessionVars.StmtCtx.RuntimeStatsColl, + Time: t, + State: s.Status(), + Info: sql, + CurTxnStartTS: curTxnStartTS, + StmtCtx: s.sessionVars.StmtCtx, + RefCountOfStmtCtx: &s.sessionVars.RefCountOfStmtCtx, + MemTracker: s.sessionVars.MemTracker, + DiskTracker: s.sessionVars.DiskTracker, + StatsInfo: plannercore.GetStatsInfo, + OOMAlarmVariablesInfo: s.getOomAlarmVariablesInfo(), + MaxExecutionTime: maxExecutionTime, + RedactSQL: s.sessionVars.EnableRedactLog, +>>>>>>> f8a6bde954 (*: add a reference count for StmtCtx (#39368)) } oldPi := s.ShowProcess() if p == nil { diff --git a/sessionctx/stmtctx/stmtctx.go b/sessionctx/stmtctx/stmtctx.go index 97c589d365164..588342d146501 100644 --- a/sessionctx/stmtctx/stmtctx.go +++ b/sessionctx/stmtctx/stmtctx.go @@ -60,6 +60,82 @@ type SQLWarn struct { Err error } +<<<<<<< HEAD +======= +type jsonSQLWarn struct { + Level string `json:"level"` + SQLErr *terror.Error `json:"err,omitempty"` + Msg string `json:"msg,omitempty"` +} + +// MarshalJSON implements the Marshaler.MarshalJSON interface. +func (warn *SQLWarn) MarshalJSON() ([]byte, error) { + w := &jsonSQLWarn{ + Level: warn.Level, + } + e := errors.Cause(warn.Err) + switch x := e.(type) { + case *terror.Error: + // Omit outter errors because only the most inner error matters. + w.SQLErr = x + default: + w.Msg = e.Error() + } + return json.Marshal(w) +} + +// UnmarshalJSON implements the Unmarshaler.UnmarshalJSON interface. +func (warn *SQLWarn) UnmarshalJSON(data []byte) error { + var w jsonSQLWarn + if err := json.Unmarshal(data, &w); err != nil { + return err + } + warn.Level = w.Level + if w.SQLErr != nil { + warn.Err = w.SQLErr + } else { + warn.Err = errors.New(w.Msg) + } + return nil +} + +// ReferenceCount indicates the reference count of StmtCtx. +type ReferenceCount int32 + +const ( + // ReferenceCountIsFrozen indicates the current StmtCtx is resetting, it'll refuse all the access from other sessions. + ReferenceCountIsFrozen int32 = -1 + // ReferenceCountNoReference indicates the current StmtCtx is not accessed by other sessions. + ReferenceCountNoReference int32 = 0 +) + +// TryIncrease tries to increase the reference count. +// There is a small chance that TryIncrease returns true while TryFreeze and +// UnFreeze are invoked successfully during the execution of TryIncrease. +func (rf *ReferenceCount) TryIncrease() bool { + refCnt := atomic.LoadInt32((*int32)(rf)) + for ; refCnt != ReferenceCountIsFrozen && !atomic.CompareAndSwapInt32((*int32)(rf), refCnt, refCnt+1); refCnt = atomic.LoadInt32((*int32)(rf)) { + } + return refCnt != ReferenceCountIsFrozen +} + +// Decrease decreases the reference count. +func (rf *ReferenceCount) Decrease() { + for refCnt := atomic.LoadInt32((*int32)(rf)); !atomic.CompareAndSwapInt32((*int32)(rf), refCnt, refCnt-1); refCnt = atomic.LoadInt32((*int32)(rf)) { + } +} + +// TryFreeze tries to freeze the StmtCtx to frozen before resetting the old StmtCtx. +func (rf *ReferenceCount) TryFreeze() bool { + return atomic.LoadInt32((*int32)(rf)) == ReferenceCountNoReference && atomic.CompareAndSwapInt32((*int32)(rf), ReferenceCountNoReference, ReferenceCountIsFrozen) +} + +// UnFreeze unfreeze the frozen StmtCtx thus the other session can access this StmtCtx. +func (rf *ReferenceCount) UnFreeze() { + atomic.StoreInt32((*int32)(rf), ReferenceCountNoReference) +} + +>>>>>>> f8a6bde954 (*: add a reference count for StmtCtx (#39368)) // StatementContext contains variables for a statement. // It should be reset before executing a statement. type StatementContext struct { diff --git a/sessionctx/variable/session.go b/sessionctx/variable/session.go index bcc8820d9aa00..7b4aebe422476 100644 --- a/sessionctx/variable/session.go +++ b/sessionctx/variable/session.go @@ -586,6 +586,11 @@ type SessionVars struct { // StmtCtx holds variables for current executing statement. StmtCtx *stmtctx.StatementContext + // RefCountOfStmtCtx indicates the reference count of StmtCtx. When the + // StmtCtx is accessed by other sessions, e.g. oom-alarm-handler/expensive-query-handler, add one first. + // Note: this variable should be accessed and updated by atomic operations. + RefCountOfStmtCtx stmtctx.ReferenceCount + // AllowAggPushDown can be set to false to forbid aggregation push down. AllowAggPushDown bool @@ -1049,9 +1054,23 @@ type SessionVars struct { // InitStatementContext initializes a StatementContext, the object is reused to reduce allocation. func (s *SessionVars) InitStatementContext() *stmtctx.StatementContext { +<<<<<<< HEAD s.cached.curr = (s.cached.curr + 1) % 2 s.cached.data[s.cached.curr] = stmtctx.StatementContext{} return &s.cached.data[s.cached.curr] +======= + sc := &s.cachedStmtCtx[0] + if sc == s.StmtCtx { + sc = &s.cachedStmtCtx[1] + } + if s.RefCountOfStmtCtx.TryFreeze() { + *sc = stmtctx.StatementContext{} + s.RefCountOfStmtCtx.UnFreeze() + } else { + sc = &stmtctx.StatementContext{} + } + return sc +>>>>>>> f8a6bde954 (*: add a reference count for StmtCtx (#39368)) } // AllocMPPTaskID allocates task id for mpp tasks. It will reset the task id if the query's diff --git a/util/expensivequery/expensivequery.go b/util/expensivequery/expensivequery.go index 035e90aec0190..020afc9783c92 100644 --- a/util/expensivequery/expensivequery.go +++ b/util/expensivequery/expensivequery.go @@ -184,6 +184,15 @@ func genLogFields(costTime time.Duration, info *util.ProcessInfo) []zap.Field { } // logExpensiveQuery logs the queries which exceed the time threshold or memory threshold. +<<<<<<< HEAD func logExpensiveQuery(costTime time.Duration, info *util.ProcessInfo) { logutil.BgLogger().Warn("expensive_query", genLogFields(costTime, info)...) +======= +func logExpensiveQuery(costTime time.Duration, info *util.ProcessInfo, msg string) { + fields := util.GenLogFields(costTime, info, true) + if fields == nil { + return + } + logutil.BgLogger().Warn(msg, fields...) +>>>>>>> f8a6bde954 (*: add a reference count for StmtCtx (#39368)) } diff --git a/util/memoryusagealarm/memoryusagealarm.go b/util/memoryusagealarm/memoryusagealarm.go new file mode 100644 index 0000000000000..c8a6fd0eaecda --- /dev/null +++ b/util/memoryusagealarm/memoryusagealarm.go @@ -0,0 +1,380 @@ +// Copyright 2020 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package memoryusagealarm + +import ( + "fmt" + "os" + "path/filepath" + rpprof "runtime/pprof" + "strings" + "sync/atomic" + "time" + + "github.com/pingcap/tidb/config" + "github.com/pingcap/tidb/sessionctx/variable" + "github.com/pingcap/tidb/util" + "github.com/pingcap/tidb/util/disk" + "github.com/pingcap/tidb/util/logutil" + "github.com/pingcap/tidb/util/memory" + "go.uber.org/zap" + "go.uber.org/zap/zapcore" + "golang.org/x/exp/slices" +) + +// Handle is the handler for expensive query. +type Handle struct { + exitCh chan struct{} + sm atomic.Value +} + +// NewMemoryUsageAlarmHandle builds a memory usage alarm handler. +func NewMemoryUsageAlarmHandle(exitCh chan struct{}) *Handle { + return &Handle{exitCh: exitCh} +} + +// SetSessionManager sets the SessionManager which is used to fetching the info +// of all active sessions. +func (eqh *Handle) SetSessionManager(sm util.SessionManager) *Handle { + eqh.sm.Store(sm) + return eqh +} + +// Run starts a memory usage alarm goroutine at the start time of the server. +func (eqh *Handle) Run() { + // use 100ms as tickInterval temply, may use given interval or use defined variable later + tickInterval := time.Millisecond * time.Duration(100) + ticker := time.NewTicker(tickInterval) + defer ticker.Stop() + sm := eqh.sm.Load().(util.SessionManager) + record := &memoryUsageAlarm{} + for { + select { + case <-ticker.C: + record.alarm4ExcessiveMemUsage(sm) + case <-eqh.exitCh: + return + } + } +} + +type memoryUsageAlarm struct { + lastCheckTime time.Time + lastUpdateVariableTime time.Time + err error + baseRecordDir string + lastRecordDirName []string + lastRecordMemUsed uint64 + memoryUsageAlarmRatio float64 + memoryUsageAlarmKeepRecordNum int64 + serverMemoryLimit uint64 + isServerMemoryLimitSet bool + initialized bool +} + +func (record *memoryUsageAlarm) updateVariable() { + if time.Since(record.lastUpdateVariableTime) < 60*time.Second { + return + } + record.memoryUsageAlarmRatio = variable.MemoryUsageAlarmRatio.Load() + record.memoryUsageAlarmKeepRecordNum = variable.MemoryUsageAlarmKeepRecordNum.Load() + record.serverMemoryLimit = memory.ServerMemoryLimit.Load() + if record.serverMemoryLimit != 0 { + record.isServerMemoryLimitSet = true + } else { + record.serverMemoryLimit, record.err = memory.MemTotal() + if record.err != nil { + logutil.BgLogger().Error("get system total memory fail", zap.Error(record.err)) + return + } + record.isServerMemoryLimitSet = false + } + record.lastUpdateVariableTime = time.Now() +} + +func (record *memoryUsageAlarm) initMemoryUsageAlarmRecord() { + record.lastCheckTime = time.Time{} + record.lastUpdateVariableTime = time.Time{} + record.updateVariable() + tidbLogDir, _ := filepath.Split(config.GetGlobalConfig().Log.File.Filename) + record.baseRecordDir = filepath.Join(tidbLogDir, "oom_record") + if record.err = disk.CheckAndCreateDir(record.baseRecordDir); record.err != nil { + return + } + // Read last records + recordDirs, err := os.ReadDir(record.baseRecordDir) + if err != nil { + record.err = err + return + } + for _, dir := range recordDirs { + name := filepath.Join(record.baseRecordDir, dir.Name()) + if strings.Contains(dir.Name(), "record") { + record.lastRecordDirName = append(record.lastRecordDirName, name) + } + } + record.initialized = true +} + +// If Performance.ServerMemoryQuota is set, use `ServerMemoryQuota * MemoryUsageAlarmRatio` to check oom risk. +// If Performance.ServerMemoryQuota is not set, use `system total memory size * MemoryUsageAlarmRatio` to check oom risk. +func (record *memoryUsageAlarm) alarm4ExcessiveMemUsage(sm util.SessionManager) { + if !record.initialized { + record.initMemoryUsageAlarmRecord() + if record.err != nil { + return + } + } else { + record.updateVariable() + } + if record.memoryUsageAlarmRatio <= 0.0 || record.memoryUsageAlarmRatio >= 1.0 { + return + } + var memoryUsage uint64 + instanceStats := memory.ReadMemStats() + if record.isServerMemoryLimitSet { + memoryUsage = instanceStats.HeapAlloc + } else { + memoryUsage, record.err = memory.MemUsed() + if record.err != nil { + logutil.BgLogger().Error("get system memory usage fail", zap.Error(record.err)) + return + } + } + + // TODO: Consider NextGC to record SQLs. + if needRecord, reason := record.needRecord(memoryUsage); needRecord { + record.lastCheckTime = time.Now() + record.lastRecordMemUsed = memoryUsage + record.doRecord(memoryUsage, instanceStats.HeapAlloc, sm, reason) + record.tryRemoveRedundantRecords() + } +} + +// AlarmReason implements alarm reason. +type AlarmReason uint + +const ( + // GrowTooFast is the reason that memory increasing too fast. + GrowTooFast AlarmReason = iota + // ExceedAlarmRatio is the reason that memory used exceed threshold. + ExceedAlarmRatio + // NoReason means no alarm + NoReason +) + +func (reason AlarmReason) String() string { + return [...]string{"memory usage grows too fast", "memory usage exceeds alarm ratio", "no reason"}[reason] +} + +func (record *memoryUsageAlarm) needRecord(memoryUsage uint64) (bool, AlarmReason) { + // At least 60 seconds between two recordings that memory usage is less than threshold (default 70% system memory). + // If the memory is still exceeded, only records once. + // If the memory used ratio recorded this time is 0.1 higher than last time, we will force record this time. + if float64(memoryUsage) <= float64(record.serverMemoryLimit)*record.memoryUsageAlarmRatio { + return false, NoReason + } + + interval := time.Since(record.lastCheckTime) + memDiff := int64(memoryUsage) - int64(record.lastRecordMemUsed) + if interval > 60*time.Second { + return true, ExceedAlarmRatio + } + if float64(memDiff) > 0.1*float64(record.serverMemoryLimit) { + return true, GrowTooFast + } + return false, NoReason +} + +func (record *memoryUsageAlarm) doRecord(memUsage uint64, instanceMemoryUsage uint64, sm util.SessionManager, alarmReason AlarmReason) { + fields := make([]zap.Field, 0, 6) + fields = append(fields, zap.Bool("is tidb_server_memory_limit set", record.isServerMemoryLimitSet)) + if record.isServerMemoryLimitSet { + fields = append(fields, zap.Any("tidb_server_memory_limit", record.serverMemoryLimit)) + fields = append(fields, zap.Any("tidb-server memory usage", memUsage)) + } else { + fields = append(fields, zap.Any("system memory total", record.serverMemoryLimit)) + fields = append(fields, zap.Any("system memory usage", memUsage)) + fields = append(fields, zap.Any("tidb-server memory usage", instanceMemoryUsage)) + } + fields = append(fields, zap.Any("memory-usage-alarm-ratio", record.memoryUsageAlarmRatio)) + fields = append(fields, zap.Any("record path", record.baseRecordDir)) + logutil.BgLogger().Warn(fmt.Sprintf("tidb-server has the risk of OOM because of %s. Running SQLs and heap profile will be recorded in record path", alarmReason.String()), fields...) + recordDir := filepath.Join(record.baseRecordDir, "record"+record.lastCheckTime.Format(time.RFC3339)) + if record.err = disk.CheckAndCreateDir(recordDir); record.err != nil { + return + } + record.lastRecordDirName = append(record.lastRecordDirName, recordDir) + if record.err = record.recordSQL(sm, recordDir); record.err != nil { + return + } + if record.err = record.recordProfile(recordDir); record.err != nil { + return + } +} + +func (record *memoryUsageAlarm) tryRemoveRedundantRecords() { + filename := &record.lastRecordDirName + for len(*filename) > int(record.memoryUsageAlarmKeepRecordNum) { + err := os.RemoveAll((*filename)[0]) + if err != nil { + logutil.BgLogger().Error("remove temp files failed", zap.Error(err)) + } + *filename = (*filename)[1:] + } +} + +func getPlanString(info *util.ProcessInfo) string { + var buf strings.Builder + rows := info.PlanExplainRows + buf.WriteString(fmt.Sprintf("|%v|%v|%v|%v|%v|", "id", "estRows", "task", "access object", "operator info")) + for _, row := range rows { + buf.WriteString(fmt.Sprintf("\n|%v|%v|%v|%v|%v|", row[0], row[1], row[2], row[3], row[4])) + } + return buf.String() +} + +func (record *memoryUsageAlarm) printTop10SqlInfo(pinfo []*util.ProcessInfo, f *os.File) { + if _, err := f.WriteString("The 10 SQLs with the most memory usage for OOM analysis\n"); err != nil { + logutil.BgLogger().Error("write top 10 memory sql info fail", zap.Error(err)) + } + memBuf := record.getTop10SqlInfoByMemoryUsage(pinfo) + if _, err := f.WriteString(memBuf.String()); err != nil { + logutil.BgLogger().Error("write top 10 memory sql info fail", zap.Error(err)) + } + if _, err := f.WriteString("The 10 SQLs with the most time usage for OOM analysis\n"); err != nil { + logutil.BgLogger().Error("write top 10 time cost sql info fail", zap.Error(err)) + } + costBuf := record.getTop10SqlInfoByCostTime(pinfo) + if _, err := f.WriteString(costBuf.String()); err != nil { + logutil.BgLogger().Error("write top 10 time cost sql info fail", zap.Error(err)) + } +} + +func (record *memoryUsageAlarm) getTop10SqlInfo(cmp func(i, j *util.ProcessInfo) bool, pinfo []*util.ProcessInfo) strings.Builder { + slices.SortFunc(pinfo, cmp) + list := pinfo + var buf strings.Builder + oomAction := variable.OOMAction.Load() + serverMemoryLimit := memory.ServerMemoryLimit.Load() + for i, totalCnt := 0, 10; i < len(list) && totalCnt > 0; i++ { + info := list[i] + buf.WriteString(fmt.Sprintf("SQL %v: \n", i)) + fields := util.GenLogFields(record.lastCheckTime.Sub(info.Time), info, false) + if fields == nil { + continue + } + fields = append(fields, zap.String("tidb_mem_oom_action", oomAction)) + fields = append(fields, zap.Uint64("tidb_server_memory_limit", serverMemoryLimit)) + fields = append(fields, zap.Int64("tidb_mem_quota_query", info.OOMAlarmVariablesInfo.SessionMemQuotaQuery)) + fields = append(fields, zap.Int("tidb_analyze_version", info.OOMAlarmVariablesInfo.SessionAnalyzeVersion)) + fields = append(fields, zap.Bool("tidb_enable_rate_limit_action", info.OOMAlarmVariablesInfo.SessionEnabledRateLimitAction)) + fields = append(fields, zap.String("current_analyze_plan", getPlanString(info))) + for _, field := range fields { + switch field.Type { + case zapcore.StringType: + buf.WriteString(fmt.Sprintf("%v: %v", field.Key, field.String)) + case zapcore.Uint8Type, zapcore.Uint16Type, zapcore.Uint32Type, zapcore.Uint64Type: + buf.WriteString(fmt.Sprintf("%v: %v", field.Key, uint64(field.Integer))) + case zapcore.Int8Type, zapcore.Int16Type, zapcore.Int32Type, zapcore.Int64Type: + buf.WriteString(fmt.Sprintf("%v: %v", field.Key, field.Integer)) + case zapcore.BoolType: + buf.WriteString(fmt.Sprintf("%v: %v", field.Key, field.Integer == 1)) + } + buf.WriteString("\n") + } + totalCnt-- + } + buf.WriteString("\n") + return buf +} + +func (record *memoryUsageAlarm) getTop10SqlInfoByMemoryUsage(pinfo []*util.ProcessInfo) strings.Builder { + return record.getTop10SqlInfo(func(i, j *util.ProcessInfo) bool { + return i.MemTracker.MaxConsumed() > j.MemTracker.MaxConsumed() + }, pinfo) +} + +func (record *memoryUsageAlarm) getTop10SqlInfoByCostTime(pinfo []*util.ProcessInfo) strings.Builder { + return record.getTop10SqlInfo(func(i, j *util.ProcessInfo) bool { + return i.Time.Before(j.Time) + }, pinfo) +} + +func (record *memoryUsageAlarm) recordSQL(sm util.SessionManager, recordDir string) error { + processInfo := sm.ShowProcessList() + pinfo := make([]*util.ProcessInfo, 0, len(processInfo)) + for _, info := range processInfo { + if len(info.Info) != 0 { + pinfo = append(pinfo, info) + } + } + fileName := filepath.Join(recordDir, "running_sql") + f, err := os.Create(fileName) + if err != nil { + logutil.BgLogger().Error("create oom record file fail", zap.Error(err)) + return err + } + defer func() { + err := f.Close() + if err != nil { + logutil.BgLogger().Error("close oom record file fail", zap.Error(err)) + } + }() + record.printTop10SqlInfo(pinfo, f) + return nil +} + +type item struct { + Name string + Debug int +} + +func (*memoryUsageAlarm) recordProfile(recordDir string) error { + items := []item{ + {Name: "heap"}, + {Name: "goroutine", Debug: 2}, + } + for _, item := range items { + if err := write(item, recordDir); err != nil { + return err + } + } + return nil +} + +func write(item item, recordDir string) error { + fileName := filepath.Join(recordDir, item.Name) + f, err := os.Create(fileName) + if err != nil { + logutil.BgLogger().Error(fmt.Sprintf("create %v profile file fail", item.Name), zap.Error(err)) + return err + } + p := rpprof.Lookup(item.Name) + err = p.WriteTo(f, item.Debug) + if err != nil { + logutil.BgLogger().Error(fmt.Sprintf("write %v profile file fail", item.Name), zap.Error(err)) + return err + } + + //nolint: revive + defer func() { + err := f.Close() + if err != nil { + logutil.BgLogger().Error(fmt.Sprintf("close %v profile file fail", item.Name), zap.Error(err)) + } + }() + return nil +} diff --git a/util/memoryusagealarm/memoryusagealarm_test.go b/util/memoryusagealarm/memoryusagealarm_test.go new file mode 100644 index 0000000000000..6e5147805676f --- /dev/null +++ b/util/memoryusagealarm/memoryusagealarm_test.go @@ -0,0 +1,145 @@ +// Copyright 2019 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package memoryusagealarm + +import ( + "testing" + "time" + + "github.com/pingcap/tidb/sessionctx/stmtctx" + "github.com/pingcap/tidb/sessionctx/variable" + "github.com/pingcap/tidb/util" + "github.com/pingcap/tidb/util/memory" + "github.com/stretchr/testify/assert" +) + +func TestIfNeedDoRecord(t *testing.T) { + record := memoryUsageAlarm{} + record.initMemoryUsageAlarmRecord() + + // mem usage ratio < 70% will not be recorded + memUsed := 0.69 * float64(record.serverMemoryLimit) + needRecord, reason := record.needRecord(uint64(memUsed)) + assert.False(t, needRecord) + assert.Equal(t, NoReason, reason) + + // mem usage ratio > 70% will not be recorded + memUsed = 0.71 * float64(record.serverMemoryLimit) + needRecord, reason = record.needRecord(uint64(memUsed)) + assert.True(t, needRecord) + assert.Equal(t, ExceedAlarmRatio, reason) + record.lastCheckTime = time.Now() + record.lastRecordMemUsed = uint64(memUsed) + + // check time - last record time < 60s will not be recorded + memUsed = 0.71 * float64(record.serverMemoryLimit) + needRecord, reason = record.needRecord(uint64(memUsed)) + assert.False(t, needRecord) + assert.Equal(t, NoReason, reason) + + // check time - last record time > 60s will be recorded + record.lastCheckTime = record.lastCheckTime.Add(-60 * time.Second) + memUsed = 0.71 * float64(record.serverMemoryLimit) + needRecord, reason = record.needRecord(uint64(memUsed)) + assert.True(t, needRecord) + assert.Equal(t, ExceedAlarmRatio, reason) + record.lastCheckTime = time.Now() + record.lastRecordMemUsed = uint64(memUsed) + + // mem usage ratio - last mem usage ratio < 10% will not be recorded + memUsed = 0.80 * float64(record.serverMemoryLimit) + needRecord, reason = record.needRecord(uint64(memUsed)) + assert.False(t, needRecord) + assert.Equal(t, NoReason, reason) + + // mem usage ratio - last mem usage ratio > 10% will not be recorded even though check time - last record time + memUsed = 0.82 * float64(record.serverMemoryLimit) + needRecord, reason = record.needRecord(uint64(memUsed)) + assert.True(t, needRecord) + assert.Equal(t, GrowTooFast, reason) +} + +func genTime(sec int64) time.Time { + minStartTime := time.Date(1970, 1, 0, 0, 0, 0, 0, time.UTC).Unix() + return time.Unix(minStartTime+sec, 0) +} + +func TestGetTop10Sql(t *testing.T) { + record := memoryUsageAlarm{} + record.initMemoryUsageAlarmRecord() + record.lastCheckTime = genTime(123456) + + processInfoList := genMockProcessInfoList([]int64{1000, 87263523, 34223}, + []time.Time{genTime(1234), genTime(123456), genTime(12)}, + 3) + actual := record.getTop10SqlInfoByMemoryUsage(processInfoList) + assert.Equal(t, "SQL 0: \ncost_time: 0s\ntxn_start_ts: 0\nmem_max: 87263523 Bytes (83.2 MB)\nsql: \ntidb_mem_oom_action: CANCEL\ntidb_server_memory_limit: 0\ntidb_mem_quota_query: 0\ntidb_analyze_version: 0\ntidb_enable_rate_limit_action: false\ncurrent_analyze_plan: |id|estRows|task|access object|operator info|\nSQL 1: \ncost_time: 123444s\ntxn_start_ts: 0\nmem_max: 34223 Bytes (33.4 KB)\nsql: \ntidb_mem_oom_action: CANCEL\ntidb_server_memory_limit: 0\ntidb_mem_quota_query: 0\ntidb_analyze_version: 0\ntidb_enable_rate_limit_action: false\ncurrent_analyze_plan: |id|estRows|task|access object|operator info|\nSQL 2: \ncost_time: 122222s\ntxn_start_ts: 0\nmem_max: 1000 Bytes (1000 Bytes)\nsql: \ntidb_mem_oom_action: CANCEL\ntidb_server_memory_limit: 0\ntidb_mem_quota_query: 0\ntidb_analyze_version: 0\ntidb_enable_rate_limit_action: false\ncurrent_analyze_plan: |id|estRows|task|access object|operator info|\n\n", + actual.String()) + actual = record.getTop10SqlInfoByCostTime(processInfoList) + assert.Equal(t, "SQL 0: \ncost_time: 123444s\ntxn_start_ts: 0\nmem_max: 34223 Bytes (33.4 KB)\nsql: \ntidb_mem_oom_action: CANCEL\ntidb_server_memory_limit: 0\ntidb_mem_quota_query: 0\ntidb_analyze_version: 0\ntidb_enable_rate_limit_action: false\ncurrent_analyze_plan: |id|estRows|task|access object|operator info|\nSQL 1: \ncost_time: 122222s\ntxn_start_ts: 0\nmem_max: 1000 Bytes (1000 Bytes)\nsql: \ntidb_mem_oom_action: CANCEL\ntidb_server_memory_limit: 0\ntidb_mem_quota_query: 0\ntidb_analyze_version: 0\ntidb_enable_rate_limit_action: false\ncurrent_analyze_plan: |id|estRows|task|access object|operator info|\nSQL 2: \ncost_time: 0s\ntxn_start_ts: 0\nmem_max: 87263523 Bytes (83.2 MB)\nsql: \ntidb_mem_oom_action: CANCEL\ntidb_server_memory_limit: 0\ntidb_mem_quota_query: 0\ntidb_analyze_version: 0\ntidb_enable_rate_limit_action: false\ncurrent_analyze_plan: |id|estRows|task|access object|operator info|\n\n", actual.String()) + + processInfoList = genMockProcessInfoList([]int64{1000, 87263523, 34223, 532355, 123225151, 231231515, 12312, 12515134234, 232, 12414, 15263236, 123123123, 15}, + []time.Time{genTime(1234), genTime(123456), genTime(12), genTime(3241), genTime(12515), genTime(3215), genTime(61314), genTime(12234), genTime(1123), genTime(512), genTime(11111), genTime(22222), genTime(5512)}, + 13) + actual = record.getTop10SqlInfoByMemoryUsage(processInfoList) + assert.Equal(t, "SQL 0: \ncost_time: 111222s\ntxn_start_ts: 0\nmem_max: 12515134234 Bytes (11.7 GB)\nsql: \ntidb_mem_oom_action: CANCEL\ntidb_server_memory_limit: 0\ntidb_mem_quota_query: 0\ntidb_analyze_version: 0\ntidb_enable_rate_limit_action: false\ncurrent_analyze_plan: |id|estRows|task|access object|operator info|\nSQL 1: \ncost_time: 120241s\ntxn_start_ts: 0\nmem_max: 231231515 Bytes (220.5 MB)\nsql: \ntidb_mem_oom_action: CANCEL\ntidb_server_memory_limit: 0\ntidb_mem_quota_query: 0\ntidb_analyze_version: 0\ntidb_enable_rate_limit_action: false\ncurrent_analyze_plan: |id|estRows|task|access object|operator info|\nSQL 2: \ncost_time: 110941s\ntxn_start_ts: 0\nmem_max: 123225151 Bytes (117.5 MB)\nsql: \ntidb_mem_oom_action: CANCEL\ntidb_server_memory_limit: 0\ntidb_mem_quota_query: 0\ntidb_analyze_version: 0\ntidb_enable_rate_limit_action: false\ncurrent_analyze_plan: |id|estRows|task|access object|operator info|\nSQL 3: \ncost_time: 101234s\ntxn_start_ts: 0\nmem_max: 123123123 Bytes (117.4 MB)\nsql: \ntidb_mem_oom_action: CANCEL\ntidb_server_memory_limit: 0\ntidb_mem_quota_query: 0\ntidb_analyze_version: 0\ntidb_enable_rate_limit_action: false\ncurrent_analyze_plan: |id|estRows|task|access object|operator info|\nSQL 4: \ncost_time: 0s\ntxn_start_ts: 0\nmem_max: 87263523 Bytes (83.2 MB)\nsql: \ntidb_mem_oom_action: CANCEL\ntidb_server_memory_limit: 0\ntidb_mem_quota_query: 0\ntidb_analyze_version: 0\ntidb_enable_rate_limit_action: false\ncurrent_analyze_plan: |id|estRows|task|access object|operator info|\nSQL 5: \ncost_time: 112345s\ntxn_start_ts: 0\nmem_max: 15263236 Bytes (14.6 MB)\nsql: \ntidb_mem_oom_action: CANCEL\ntidb_server_memory_limit: 0\ntidb_mem_quota_query: 0\ntidb_analyze_version: 0\ntidb_enable_rate_limit_action: false\ncurrent_analyze_plan: |id|estRows|task|access object|operator info|\nSQL 6: \ncost_time: 120215s\ntxn_start_ts: 0\nmem_max: 532355 Bytes (519.9 KB)\nsql: \ntidb_mem_oom_action: CANCEL\ntidb_server_memory_limit: 0\ntidb_mem_quota_query: 0\ntidb_analyze_version: 0\ntidb_enable_rate_limit_action: false\ncurrent_analyze_plan: |id|estRows|task|access object|operator info|\nSQL 7: \ncost_time: 123444s\ntxn_start_ts: 0\nmem_max: 34223 Bytes (33.4 KB)\nsql: \ntidb_mem_oom_action: CANCEL\ntidb_server_memory_limit: 0\ntidb_mem_quota_query: 0\ntidb_analyze_version: 0\ntidb_enable_rate_limit_action: false\ncurrent_analyze_plan: |id|estRows|task|access object|operator info|\nSQL 8: \ncost_time: 122944s\ntxn_start_ts: 0\nmem_max: 12414 Bytes (12.1 KB)\nsql: \ntidb_mem_oom_action: CANCEL\ntidb_server_memory_limit: 0\ntidb_mem_quota_query: 0\ntidb_analyze_version: 0\ntidb_enable_rate_limit_action: false\ncurrent_analyze_plan: |id|estRows|task|access object|operator info|\nSQL 9: \ncost_time: 62142s\ntxn_start_ts: 0\nmem_max: 12312 Bytes (12.0 KB)\nsql: \ntidb_mem_oom_action: CANCEL\ntidb_server_memory_limit: 0\ntidb_mem_quota_query: 0\ntidb_analyze_version: 0\ntidb_enable_rate_limit_action: false\ncurrent_analyze_plan: |id|estRows|task|access object|operator info|\n\n", actual.String()) + actual = record.getTop10SqlInfoByCostTime(processInfoList) + assert.Equal(t, "SQL 0: \ncost_time: 123444s\ntxn_start_ts: 0\nmem_max: 34223 Bytes (33.4 KB)\nsql: \ntidb_mem_oom_action: CANCEL\ntidb_server_memory_limit: 0\ntidb_mem_quota_query: 0\ntidb_analyze_version: 0\ntidb_enable_rate_limit_action: false\ncurrent_analyze_plan: |id|estRows|task|access object|operator info|\nSQL 1: \ncost_time: 122944s\ntxn_start_ts: 0\nmem_max: 12414 Bytes (12.1 KB)\nsql: \ntidb_mem_oom_action: CANCEL\ntidb_server_memory_limit: 0\ntidb_mem_quota_query: 0\ntidb_analyze_version: 0\ntidb_enable_rate_limit_action: false\ncurrent_analyze_plan: |id|estRows|task|access object|operator info|\nSQL 2: \ncost_time: 122333s\ntxn_start_ts: 0\nmem_max: 232 Bytes (232 Bytes)\nsql: \ntidb_mem_oom_action: CANCEL\ntidb_server_memory_limit: 0\ntidb_mem_quota_query: 0\ntidb_analyze_version: 0\ntidb_enable_rate_limit_action: false\ncurrent_analyze_plan: |id|estRows|task|access object|operator info|\nSQL 3: \ncost_time: 122222s\ntxn_start_ts: 0\nmem_max: 1000 Bytes (1000 Bytes)\nsql: \ntidb_mem_oom_action: CANCEL\ntidb_server_memory_limit: 0\ntidb_mem_quota_query: 0\ntidb_analyze_version: 0\ntidb_enable_rate_limit_action: false\ncurrent_analyze_plan: |id|estRows|task|access object|operator info|\nSQL 4: \ncost_time: 120241s\ntxn_start_ts: 0\nmem_max: 231231515 Bytes (220.5 MB)\nsql: \ntidb_mem_oom_action: CANCEL\ntidb_server_memory_limit: 0\ntidb_mem_quota_query: 0\ntidb_analyze_version: 0\ntidb_enable_rate_limit_action: false\ncurrent_analyze_plan: |id|estRows|task|access object|operator info|\nSQL 5: \ncost_time: 120215s\ntxn_start_ts: 0\nmem_max: 532355 Bytes (519.9 KB)\nsql: \ntidb_mem_oom_action: CANCEL\ntidb_server_memory_limit: 0\ntidb_mem_quota_query: 0\ntidb_analyze_version: 0\ntidb_enable_rate_limit_action: false\ncurrent_analyze_plan: |id|estRows|task|access object|operator info|\nSQL 6: \ncost_time: 117944s\ntxn_start_ts: 0\nmem_max: 15 Bytes (15 Bytes)\nsql: \ntidb_mem_oom_action: CANCEL\ntidb_server_memory_limit: 0\ntidb_mem_quota_query: 0\ntidb_analyze_version: 0\ntidb_enable_rate_limit_action: false\ncurrent_analyze_plan: |id|estRows|task|access object|operator info|\nSQL 7: \ncost_time: 112345s\ntxn_start_ts: 0\nmem_max: 15263236 Bytes (14.6 MB)\nsql: \ntidb_mem_oom_action: CANCEL\ntidb_server_memory_limit: 0\ntidb_mem_quota_query: 0\ntidb_analyze_version: 0\ntidb_enable_rate_limit_action: false\ncurrent_analyze_plan: |id|estRows|task|access object|operator info|\nSQL 8: \ncost_time: 111222s\ntxn_start_ts: 0\nmem_max: 12515134234 Bytes (11.7 GB)\nsql: \ntidb_mem_oom_action: CANCEL\ntidb_server_memory_limit: 0\ntidb_mem_quota_query: 0\ntidb_analyze_version: 0\ntidb_enable_rate_limit_action: false\ncurrent_analyze_plan: |id|estRows|task|access object|operator info|\nSQL 9: \ncost_time: 110941s\ntxn_start_ts: 0\nmem_max: 123225151 Bytes (117.5 MB)\nsql: \ntidb_mem_oom_action: CANCEL\ntidb_server_memory_limit: 0\ntidb_mem_quota_query: 0\ntidb_analyze_version: 0\ntidb_enable_rate_limit_action: false\ncurrent_analyze_plan: |id|estRows|task|access object|operator info|\n\n", actual.String()) +} + +func genMockProcessInfoList(memConsumeList []int64, startTimeList []time.Time, size int) []*util.ProcessInfo { + processInfoList := make([]*util.ProcessInfo, 0, size) + for i := 0; i < size; i++ { + tracker := memory.NewTracker(0, 0) + tracker.Consume(memConsumeList[i]) + var stmtCtxRefCount stmtctx.ReferenceCount = 0 + processInfo := util.ProcessInfo{Time: startTimeList[i], + StmtCtx: &stmtctx.StatementContext{}, + MemTracker: tracker, + StatsInfo: func(interface{}) map[string]uint64 { + return map[string]uint64{} + }, + RefCountOfStmtCtx: &stmtCtxRefCount, + } + processInfoList = append(processInfoList, &processInfo) + } + return processInfoList +} + +func TestUpdateVariables(t *testing.T) { + variable.MemoryUsageAlarmRatio.Store(0.3) + variable.MemoryUsageAlarmKeepRecordNum.Store(3) + memory.ServerMemoryLimit.Store(1024) + + record := memoryUsageAlarm{} + + record.initMemoryUsageAlarmRecord() + assert.Equal(t, 0.3, record.memoryUsageAlarmRatio) + assert.Equal(t, int64(3), record.memoryUsageAlarmKeepRecordNum) + assert.Equal(t, uint64(1024), record.serverMemoryLimit) + variable.MemoryUsageAlarmRatio.Store(0.6) + variable.MemoryUsageAlarmKeepRecordNum.Store(6) + memory.ServerMemoryLimit.Store(2048) + + record.updateVariable() + assert.Equal(t, 0.3, record.memoryUsageAlarmRatio) + assert.Equal(t, int64(3), record.memoryUsageAlarmKeepRecordNum) + assert.Equal(t, uint64(1024), record.serverMemoryLimit) + record.lastUpdateVariableTime = record.lastUpdateVariableTime.Add(-60 * time.Second) + record.updateVariable() + assert.Equal(t, 0.6, record.memoryUsageAlarmRatio) + assert.Equal(t, int64(6), record.memoryUsageAlarmKeepRecordNum) + assert.Equal(t, uint64(2048), record.serverMemoryLimit) +} diff --git a/util/processinfo.go b/util/processinfo.go index e9f496f73f60a..3c0567cf7e399 100644 --- a/util/processinfo.go +++ b/util/processinfo.go @@ -31,6 +31,7 @@ import ( // ProcessInfo is a struct used for show processlist statement. type ProcessInfo struct { +<<<<<<< HEAD ID uint64 User string Host string @@ -45,6 +46,26 @@ type ProcessInfo struct { CurTxnStartTS uint64 StmtCtx *stmtctx.StatementContext StatsInfo func(interface{}) map[string]uint64 +======= + Time time.Time + Plan interface{} + StmtCtx *stmtctx.StatementContext + RefCountOfStmtCtx *stmtctx.ReferenceCount + MemTracker *memory.Tracker + DiskTracker *disk.Tracker + StatsInfo func(interface{}) map[string]uint64 + RuntimeStatsColl *execdetails.RuntimeStatsColl + DB string + Digest string + Host string + User string + Info string + Port string + PlanExplainRows [][]string + OOMAlarmVariablesInfo OOMAlarmVariablesInfo + ID uint64 + CurTxnStartTS uint64 +>>>>>>> f8a6bde954 (*: add a reference count for StmtCtx (#39368)) // MaxExecutionTime is the timeout for select statement, in milliseconds. // If the query takes too long, kill it. MaxExecutionTime uint64 diff --git a/util/util.go b/util/util.go index 9b1a9b52840fc..d37192db03c78 100644 --- a/util/util.go +++ b/util/util.go @@ -71,3 +71,103 @@ func GetJSON(client *http.Client, url string, v interface{}) error { return errors.Trace(json.NewDecoder(resp.Body).Decode(v)) } +<<<<<<< HEAD +======= + +// ChanMap creates a channel which applies the function over the input Channel. +// Hint of Resource Leakage: +// In golang, channel isn't an interface so we must create a goroutine for handling the inputs. +// Hence the input channel must be closed properly or this function may leak a goroutine. +func ChanMap[T, R any](c <-chan T, f func(T) R) <-chan R { + outCh := make(chan R) + go func() { + defer close(outCh) + for item := range c { + outCh <- f(item) + } + }() + return outCh +} + +// Str2Int64Map converts a string to a map[int64]struct{}. +func Str2Int64Map(str string) map[int64]struct{} { + strs := strings.Split(str, ",") + res := make(map[int64]struct{}, len(strs)) + for _, s := range strs { + id, _ := strconv.ParseInt(s, 10, 64) + res[id] = struct{}{} + } + return res +} + +// GenLogFields generate log fields. +func GenLogFields(costTime time.Duration, info *ProcessInfo, needTruncateSQL bool) []zap.Field { + if info.RefCountOfStmtCtx != nil && !info.RefCountOfStmtCtx.TryIncrease() { + return nil + } + defer info.RefCountOfStmtCtx.Decrease() + + logFields := make([]zap.Field, 0, 20) + logFields = append(logFields, zap.String("cost_time", strconv.FormatFloat(costTime.Seconds(), 'f', -1, 64)+"s")) + execDetail := info.StmtCtx.GetExecDetails() + logFields = append(logFields, execDetail.ToZapFields()...) + if copTaskInfo := info.StmtCtx.CopTasksDetails(); copTaskInfo != nil { + logFields = append(logFields, copTaskInfo.ToZapFields()...) + } + if statsInfo := info.StatsInfo(info.Plan); len(statsInfo) > 0 { + var buf strings.Builder + firstComma := false + vStr := "" + for k, v := range statsInfo { + if v == 0 { + vStr = "pseudo" + } else { + vStr = strconv.FormatUint(v, 10) + } + if firstComma { + buf.WriteString("," + k + ":" + vStr) + } else { + buf.WriteString(k + ":" + vStr) + firstComma = true + } + } + logFields = append(logFields, zap.String("stats", buf.String())) + } + if info.ID != 0 { + logFields = append(logFields, zap.Uint64("conn_id", info.ID)) + } + if len(info.User) > 0 { + logFields = append(logFields, zap.String("user", info.User)) + } + if len(info.DB) > 0 { + logFields = append(logFields, zap.String("database", info.DB)) + } + var tableIDs, indexNames string + if len(info.StmtCtx.TableIDs) > 0 { + tableIDs = strings.Replace(fmt.Sprintf("%v", info.StmtCtx.TableIDs), " ", ",", -1) + logFields = append(logFields, zap.String("table_ids", tableIDs)) + } + if len(info.StmtCtx.IndexNames) > 0 { + indexNames = strings.Replace(fmt.Sprintf("%v", info.StmtCtx.IndexNames), " ", ",", -1) + logFields = append(logFields, zap.String("index_names", indexNames)) + } + logFields = append(logFields, zap.Uint64("txn_start_ts", info.CurTxnStartTS)) + if memTracker := info.MemTracker; memTracker != nil { + logFields = append(logFields, zap.String("mem_max", fmt.Sprintf("%d Bytes (%v)", memTracker.MaxConsumed(), memTracker.FormatBytes(memTracker.MaxConsumed())))) + } + + const logSQLLen = 1024 * 8 + var sql string + if len(info.Info) > 0 { + sql = info.Info + if info.RedactSQL { + sql = parser.Normalize(sql) + } + } + if len(sql) > logSQLLen && needTruncateSQL { + sql = fmt.Sprintf("%s len(%d)", sql[:logSQLLen], len(sql)) + } + logFields = append(logFields, zap.String("sql", sql)) + return logFields +} +>>>>>>> f8a6bde954 (*: add a reference count for StmtCtx (#39368)) diff --git a/util/util_test.go b/util/util_test.go new file mode 100644 index 0000000000000..ca68a55cd8ba6 --- /dev/null +++ b/util/util_test.go @@ -0,0 +1,74 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package util + +import ( + "testing" + "time" + + "github.com/pingcap/tidb/sessionctx/stmtctx" + "github.com/pingcap/tidb/util/memory" + "github.com/stretchr/testify/assert" +) + +func TestLogFormat(t *testing.T) { + mem := memory.NewTracker(-1, -1) + mem.Consume(1<<30 + 1<<29 + 1<<28 + 1<<27) + mockTooLongQuery := make([]byte, 1024*9) + + var refCount stmtctx.ReferenceCount = 0 + info := &ProcessInfo{ + ID: 233, + User: "PingCAP", + Host: "127.0.0.1", + DB: "Database", + Info: "select * from table where a > 1", + CurTxnStartTS: 23333, + StatsInfo: func(interface{}) map[string]uint64 { + return nil + }, + StmtCtx: &stmtctx.StatementContext{}, + RefCountOfStmtCtx: &refCount, + MemTracker: mem, + RedactSQL: false, + } + costTime := time.Second * 233 + logSQLTruncateLen := 1024 * 8 + logFields := GenLogFields(costTime, info, true) + + assert.Len(t, logFields, 7) + assert.Equal(t, "cost_time", logFields[0].Key) + assert.Equal(t, "233s", logFields[0].String) + assert.Equal(t, "conn_id", logFields[1].Key) + assert.Equal(t, int64(233), logFields[1].Integer) + assert.Equal(t, "user", logFields[2].Key) + assert.Equal(t, "PingCAP", logFields[2].String) + assert.Equal(t, "database", logFields[3].Key) + assert.Equal(t, "Database", logFields[3].String) + assert.Equal(t, "txn_start_ts", logFields[4].Key) + assert.Equal(t, int64(23333), logFields[4].Integer) + assert.Equal(t, "mem_max", logFields[5].Key) + assert.Equal(t, "2013265920 Bytes (1.88 GB)", logFields[5].String) + assert.Equal(t, "sql", logFields[6].Key) + assert.Equal(t, "select * from table where a > 1", logFields[6].String) + + logFields = GenLogFields(costTime, info, true) + assert.Equal(t, "select * from table where a > 1", logFields[6].String) + info.Info = string(mockTooLongQuery) + logFields = GenLogFields(costTime, info, true) + assert.Equal(t, len(logFields[6].String), logSQLTruncateLen+10) + logFields = GenLogFields(costTime, info, false) + assert.Equal(t, len(logFields[6].String), len(mockTooLongQuery)) +} From 7e6f16e3c540b6533468a621f0ad283f51f627d9 Mon Sep 17 00:00:00 2001 From: cbcwestwolf <1004626265@qq.com> Date: Mon, 26 Dec 2022 15:56:18 +0800 Subject: [PATCH 2/2] Resolve conflict --- session/session.go | 53 +-- sessionctx/stmtctx/stmtctx.go | 40 -- sessionctx/variable/session.go | 13 +- util/expensivequery/expensivequerey_test.go | 4 +- util/expensivequery/expensivequery.go | 13 +- util/memoryusagealarm/memoryusagealarm.go | 380 ------------------ .../memoryusagealarm/memoryusagealarm_test.go | 145 ------- util/processinfo.go | 50 +-- util/util.go | 100 ----- util/util_test.go | 74 ---- 10 files changed, 42 insertions(+), 830 deletions(-) delete mode 100644 util/memoryusagealarm/memoryusagealarm.go delete mode 100644 util/memoryusagealarm/memoryusagealarm_test.go delete mode 100644 util/util_test.go diff --git a/session/session.go b/session/session.go index b2ccbfcf3d1a1..999215977d26a 100644 --- a/session/session.go +++ b/session/session.go @@ -1394,43 +1394,22 @@ func (s *session) SetProcessInfo(sql string, t time.Time, command byte, maxExecu p = explain.TargetPlan } pi := util.ProcessInfo{ -<<<<<<< HEAD - ID: s.sessionVars.ConnectionID, - Port: s.sessionVars.Port, - DB: s.sessionVars.CurrentDB, - Command: command, - Plan: p, - PlanExplainRows: plannercore.GetExplainRowsForPlan(p), - RuntimeStatsColl: s.sessionVars.StmtCtx.RuntimeStatsColl, - Time: t, - State: s.Status(), - Info: sql, - CurTxnStartTS: curTxnStartTS, - StmtCtx: s.sessionVars.StmtCtx, - StatsInfo: plannercore.GetStatsInfo, - MaxExecutionTime: maxExecutionTime, - RedactSQL: s.sessionVars.EnableRedactLog, -======= - ID: s.sessionVars.ConnectionID, - Port: s.sessionVars.Port, - DB: s.sessionVars.CurrentDB, - Command: command, - Plan: p, - PlanExplainRows: plannercore.GetExplainRowsForPlan(p), - RuntimeStatsColl: s.sessionVars.StmtCtx.RuntimeStatsColl, - Time: t, - State: s.Status(), - Info: sql, - CurTxnStartTS: curTxnStartTS, - StmtCtx: s.sessionVars.StmtCtx, - RefCountOfStmtCtx: &s.sessionVars.RefCountOfStmtCtx, - MemTracker: s.sessionVars.MemTracker, - DiskTracker: s.sessionVars.DiskTracker, - StatsInfo: plannercore.GetStatsInfo, - OOMAlarmVariablesInfo: s.getOomAlarmVariablesInfo(), - MaxExecutionTime: maxExecutionTime, - RedactSQL: s.sessionVars.EnableRedactLog, ->>>>>>> f8a6bde954 (*: add a reference count for StmtCtx (#39368)) + ID: s.sessionVars.ConnectionID, + Port: s.sessionVars.Port, + DB: s.sessionVars.CurrentDB, + Command: command, + Plan: p, + PlanExplainRows: plannercore.GetExplainRowsForPlan(p), + RuntimeStatsColl: s.sessionVars.StmtCtx.RuntimeStatsColl, + Time: t, + State: s.Status(), + Info: sql, + CurTxnStartTS: curTxnStartTS, + StmtCtx: s.sessionVars.StmtCtx, + RefCountOfStmtCtx: &s.sessionVars.RefCountOfStmtCtx, + StatsInfo: plannercore.GetStatsInfo, + MaxExecutionTime: maxExecutionTime, + RedactSQL: s.sessionVars.EnableRedactLog, } oldPi := s.ShowProcess() if p == nil { diff --git a/sessionctx/stmtctx/stmtctx.go b/sessionctx/stmtctx/stmtctx.go index 588342d146501..d5f19e8fe6dd5 100644 --- a/sessionctx/stmtctx/stmtctx.go +++ b/sessionctx/stmtctx/stmtctx.go @@ -60,45 +60,6 @@ type SQLWarn struct { Err error } -<<<<<<< HEAD -======= -type jsonSQLWarn struct { - Level string `json:"level"` - SQLErr *terror.Error `json:"err,omitempty"` - Msg string `json:"msg,omitempty"` -} - -// MarshalJSON implements the Marshaler.MarshalJSON interface. -func (warn *SQLWarn) MarshalJSON() ([]byte, error) { - w := &jsonSQLWarn{ - Level: warn.Level, - } - e := errors.Cause(warn.Err) - switch x := e.(type) { - case *terror.Error: - // Omit outter errors because only the most inner error matters. - w.SQLErr = x - default: - w.Msg = e.Error() - } - return json.Marshal(w) -} - -// UnmarshalJSON implements the Unmarshaler.UnmarshalJSON interface. -func (warn *SQLWarn) UnmarshalJSON(data []byte) error { - var w jsonSQLWarn - if err := json.Unmarshal(data, &w); err != nil { - return err - } - warn.Level = w.Level - if w.SQLErr != nil { - warn.Err = w.SQLErr - } else { - warn.Err = errors.New(w.Msg) - } - return nil -} - // ReferenceCount indicates the reference count of StmtCtx. type ReferenceCount int32 @@ -135,7 +96,6 @@ func (rf *ReferenceCount) UnFreeze() { atomic.StoreInt32((*int32)(rf), ReferenceCountNoReference) } ->>>>>>> f8a6bde954 (*: add a reference count for StmtCtx (#39368)) // StatementContext contains variables for a statement. // It should be reset before executing a statement. type StatementContext struct { diff --git a/sessionctx/variable/session.go b/sessionctx/variable/session.go index 7b4aebe422476..d77d14e351556 100644 --- a/sessionctx/variable/session.go +++ b/sessionctx/variable/session.go @@ -1006,11 +1006,8 @@ type SessionVars struct { // ReadStaleness indicates the staleness duration for the following query ReadStaleness time.Duration - // cached is used to optimze the object allocation. - cached struct { - curr int8 - data [2]stmtctx.StatementContext - } + // cachedStmtCtx is used to optimze the object allocation. + cachedStmtCtx [2]stmtctx.StatementContext // Rng stores the rand_seed1 and rand_seed2 for Rand() function Rng *mathutil.MysqlRng @@ -1054,11 +1051,6 @@ type SessionVars struct { // InitStatementContext initializes a StatementContext, the object is reused to reduce allocation. func (s *SessionVars) InitStatementContext() *stmtctx.StatementContext { -<<<<<<< HEAD - s.cached.curr = (s.cached.curr + 1) % 2 - s.cached.data[s.cached.curr] = stmtctx.StatementContext{} - return &s.cached.data[s.cached.curr] -======= sc := &s.cachedStmtCtx[0] if sc == s.StmtCtx { sc = &s.cachedStmtCtx[1] @@ -1070,7 +1062,6 @@ func (s *SessionVars) InitStatementContext() *stmtctx.StatementContext { sc = &stmtctx.StatementContext{} } return sc ->>>>>>> f8a6bde954 (*: add a reference count for StmtCtx (#39368)) } // AllocMPPTaskID allocates task id for mpp tasks. It will reset the task id if the query's diff --git a/util/expensivequery/expensivequerey_test.go b/util/expensivequery/expensivequerey_test.go index c1a8464fdfeb9..7f64a8406122e 100644 --- a/util/expensivequery/expensivequerey_test.go +++ b/util/expensivequery/expensivequerey_test.go @@ -39,6 +39,7 @@ func TestMain(m *testing.M) { func TestLogFormat(t *testing.T) { mem := memory.NewTracker(-1, -1) mem.Consume(1<<30 + 1<<29 + 1<<28 + 1<<27) + var refCount stmtctx.ReferenceCount = 0 info := &util.ProcessInfo{ ID: 233, User: "PingCAP", @@ -52,7 +53,8 @@ func TestLogFormat(t *testing.T) { StmtCtx: &stmtctx.StatementContext{ MemTracker: mem, }, - RedactSQL: false, + RefCountOfStmtCtx: &refCount, + RedactSQL: false, } costTime := time.Second * 233 logFields := genLogFields(costTime, info) diff --git a/util/expensivequery/expensivequery.go b/util/expensivequery/expensivequery.go index 020afc9783c92..ce7cc44980240 100644 --- a/util/expensivequery/expensivequery.go +++ b/util/expensivequery/expensivequery.go @@ -119,6 +119,10 @@ func (eqh *Handle) LogOnQueryExceedMemQuota(connID uint64) { } func genLogFields(costTime time.Duration, info *util.ProcessInfo) []zap.Field { + if info.RefCountOfStmtCtx != nil && !info.RefCountOfStmtCtx.TryIncrease() { + return nil + } + defer info.RefCountOfStmtCtx.Decrease() logFields := make([]zap.Field, 0, 20) logFields = append(logFields, zap.String("cost_time", strconv.FormatFloat(costTime.Seconds(), 'f', -1, 64)+"s")) execDetail := info.StmtCtx.GetExecDetails() @@ -184,15 +188,10 @@ func genLogFields(costTime time.Duration, info *util.ProcessInfo) []zap.Field { } // logExpensiveQuery logs the queries which exceed the time threshold or memory threshold. -<<<<<<< HEAD func logExpensiveQuery(costTime time.Duration, info *util.ProcessInfo) { - logutil.BgLogger().Warn("expensive_query", genLogFields(costTime, info)...) -======= -func logExpensiveQuery(costTime time.Duration, info *util.ProcessInfo, msg string) { - fields := util.GenLogFields(costTime, info, true) + fields := genLogFields(costTime, info) if fields == nil { return } - logutil.BgLogger().Warn(msg, fields...) ->>>>>>> f8a6bde954 (*: add a reference count for StmtCtx (#39368)) + logutil.BgLogger().Warn("expensive_query", fields...) } diff --git a/util/memoryusagealarm/memoryusagealarm.go b/util/memoryusagealarm/memoryusagealarm.go deleted file mode 100644 index c8a6fd0eaecda..0000000000000 --- a/util/memoryusagealarm/memoryusagealarm.go +++ /dev/null @@ -1,380 +0,0 @@ -// Copyright 2020 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package memoryusagealarm - -import ( - "fmt" - "os" - "path/filepath" - rpprof "runtime/pprof" - "strings" - "sync/atomic" - "time" - - "github.com/pingcap/tidb/config" - "github.com/pingcap/tidb/sessionctx/variable" - "github.com/pingcap/tidb/util" - "github.com/pingcap/tidb/util/disk" - "github.com/pingcap/tidb/util/logutil" - "github.com/pingcap/tidb/util/memory" - "go.uber.org/zap" - "go.uber.org/zap/zapcore" - "golang.org/x/exp/slices" -) - -// Handle is the handler for expensive query. -type Handle struct { - exitCh chan struct{} - sm atomic.Value -} - -// NewMemoryUsageAlarmHandle builds a memory usage alarm handler. -func NewMemoryUsageAlarmHandle(exitCh chan struct{}) *Handle { - return &Handle{exitCh: exitCh} -} - -// SetSessionManager sets the SessionManager which is used to fetching the info -// of all active sessions. -func (eqh *Handle) SetSessionManager(sm util.SessionManager) *Handle { - eqh.sm.Store(sm) - return eqh -} - -// Run starts a memory usage alarm goroutine at the start time of the server. -func (eqh *Handle) Run() { - // use 100ms as tickInterval temply, may use given interval or use defined variable later - tickInterval := time.Millisecond * time.Duration(100) - ticker := time.NewTicker(tickInterval) - defer ticker.Stop() - sm := eqh.sm.Load().(util.SessionManager) - record := &memoryUsageAlarm{} - for { - select { - case <-ticker.C: - record.alarm4ExcessiveMemUsage(sm) - case <-eqh.exitCh: - return - } - } -} - -type memoryUsageAlarm struct { - lastCheckTime time.Time - lastUpdateVariableTime time.Time - err error - baseRecordDir string - lastRecordDirName []string - lastRecordMemUsed uint64 - memoryUsageAlarmRatio float64 - memoryUsageAlarmKeepRecordNum int64 - serverMemoryLimit uint64 - isServerMemoryLimitSet bool - initialized bool -} - -func (record *memoryUsageAlarm) updateVariable() { - if time.Since(record.lastUpdateVariableTime) < 60*time.Second { - return - } - record.memoryUsageAlarmRatio = variable.MemoryUsageAlarmRatio.Load() - record.memoryUsageAlarmKeepRecordNum = variable.MemoryUsageAlarmKeepRecordNum.Load() - record.serverMemoryLimit = memory.ServerMemoryLimit.Load() - if record.serverMemoryLimit != 0 { - record.isServerMemoryLimitSet = true - } else { - record.serverMemoryLimit, record.err = memory.MemTotal() - if record.err != nil { - logutil.BgLogger().Error("get system total memory fail", zap.Error(record.err)) - return - } - record.isServerMemoryLimitSet = false - } - record.lastUpdateVariableTime = time.Now() -} - -func (record *memoryUsageAlarm) initMemoryUsageAlarmRecord() { - record.lastCheckTime = time.Time{} - record.lastUpdateVariableTime = time.Time{} - record.updateVariable() - tidbLogDir, _ := filepath.Split(config.GetGlobalConfig().Log.File.Filename) - record.baseRecordDir = filepath.Join(tidbLogDir, "oom_record") - if record.err = disk.CheckAndCreateDir(record.baseRecordDir); record.err != nil { - return - } - // Read last records - recordDirs, err := os.ReadDir(record.baseRecordDir) - if err != nil { - record.err = err - return - } - for _, dir := range recordDirs { - name := filepath.Join(record.baseRecordDir, dir.Name()) - if strings.Contains(dir.Name(), "record") { - record.lastRecordDirName = append(record.lastRecordDirName, name) - } - } - record.initialized = true -} - -// If Performance.ServerMemoryQuota is set, use `ServerMemoryQuota * MemoryUsageAlarmRatio` to check oom risk. -// If Performance.ServerMemoryQuota is not set, use `system total memory size * MemoryUsageAlarmRatio` to check oom risk. -func (record *memoryUsageAlarm) alarm4ExcessiveMemUsage(sm util.SessionManager) { - if !record.initialized { - record.initMemoryUsageAlarmRecord() - if record.err != nil { - return - } - } else { - record.updateVariable() - } - if record.memoryUsageAlarmRatio <= 0.0 || record.memoryUsageAlarmRatio >= 1.0 { - return - } - var memoryUsage uint64 - instanceStats := memory.ReadMemStats() - if record.isServerMemoryLimitSet { - memoryUsage = instanceStats.HeapAlloc - } else { - memoryUsage, record.err = memory.MemUsed() - if record.err != nil { - logutil.BgLogger().Error("get system memory usage fail", zap.Error(record.err)) - return - } - } - - // TODO: Consider NextGC to record SQLs. - if needRecord, reason := record.needRecord(memoryUsage); needRecord { - record.lastCheckTime = time.Now() - record.lastRecordMemUsed = memoryUsage - record.doRecord(memoryUsage, instanceStats.HeapAlloc, sm, reason) - record.tryRemoveRedundantRecords() - } -} - -// AlarmReason implements alarm reason. -type AlarmReason uint - -const ( - // GrowTooFast is the reason that memory increasing too fast. - GrowTooFast AlarmReason = iota - // ExceedAlarmRatio is the reason that memory used exceed threshold. - ExceedAlarmRatio - // NoReason means no alarm - NoReason -) - -func (reason AlarmReason) String() string { - return [...]string{"memory usage grows too fast", "memory usage exceeds alarm ratio", "no reason"}[reason] -} - -func (record *memoryUsageAlarm) needRecord(memoryUsage uint64) (bool, AlarmReason) { - // At least 60 seconds between two recordings that memory usage is less than threshold (default 70% system memory). - // If the memory is still exceeded, only records once. - // If the memory used ratio recorded this time is 0.1 higher than last time, we will force record this time. - if float64(memoryUsage) <= float64(record.serverMemoryLimit)*record.memoryUsageAlarmRatio { - return false, NoReason - } - - interval := time.Since(record.lastCheckTime) - memDiff := int64(memoryUsage) - int64(record.lastRecordMemUsed) - if interval > 60*time.Second { - return true, ExceedAlarmRatio - } - if float64(memDiff) > 0.1*float64(record.serverMemoryLimit) { - return true, GrowTooFast - } - return false, NoReason -} - -func (record *memoryUsageAlarm) doRecord(memUsage uint64, instanceMemoryUsage uint64, sm util.SessionManager, alarmReason AlarmReason) { - fields := make([]zap.Field, 0, 6) - fields = append(fields, zap.Bool("is tidb_server_memory_limit set", record.isServerMemoryLimitSet)) - if record.isServerMemoryLimitSet { - fields = append(fields, zap.Any("tidb_server_memory_limit", record.serverMemoryLimit)) - fields = append(fields, zap.Any("tidb-server memory usage", memUsage)) - } else { - fields = append(fields, zap.Any("system memory total", record.serverMemoryLimit)) - fields = append(fields, zap.Any("system memory usage", memUsage)) - fields = append(fields, zap.Any("tidb-server memory usage", instanceMemoryUsage)) - } - fields = append(fields, zap.Any("memory-usage-alarm-ratio", record.memoryUsageAlarmRatio)) - fields = append(fields, zap.Any("record path", record.baseRecordDir)) - logutil.BgLogger().Warn(fmt.Sprintf("tidb-server has the risk of OOM because of %s. Running SQLs and heap profile will be recorded in record path", alarmReason.String()), fields...) - recordDir := filepath.Join(record.baseRecordDir, "record"+record.lastCheckTime.Format(time.RFC3339)) - if record.err = disk.CheckAndCreateDir(recordDir); record.err != nil { - return - } - record.lastRecordDirName = append(record.lastRecordDirName, recordDir) - if record.err = record.recordSQL(sm, recordDir); record.err != nil { - return - } - if record.err = record.recordProfile(recordDir); record.err != nil { - return - } -} - -func (record *memoryUsageAlarm) tryRemoveRedundantRecords() { - filename := &record.lastRecordDirName - for len(*filename) > int(record.memoryUsageAlarmKeepRecordNum) { - err := os.RemoveAll((*filename)[0]) - if err != nil { - logutil.BgLogger().Error("remove temp files failed", zap.Error(err)) - } - *filename = (*filename)[1:] - } -} - -func getPlanString(info *util.ProcessInfo) string { - var buf strings.Builder - rows := info.PlanExplainRows - buf.WriteString(fmt.Sprintf("|%v|%v|%v|%v|%v|", "id", "estRows", "task", "access object", "operator info")) - for _, row := range rows { - buf.WriteString(fmt.Sprintf("\n|%v|%v|%v|%v|%v|", row[0], row[1], row[2], row[3], row[4])) - } - return buf.String() -} - -func (record *memoryUsageAlarm) printTop10SqlInfo(pinfo []*util.ProcessInfo, f *os.File) { - if _, err := f.WriteString("The 10 SQLs with the most memory usage for OOM analysis\n"); err != nil { - logutil.BgLogger().Error("write top 10 memory sql info fail", zap.Error(err)) - } - memBuf := record.getTop10SqlInfoByMemoryUsage(pinfo) - if _, err := f.WriteString(memBuf.String()); err != nil { - logutil.BgLogger().Error("write top 10 memory sql info fail", zap.Error(err)) - } - if _, err := f.WriteString("The 10 SQLs with the most time usage for OOM analysis\n"); err != nil { - logutil.BgLogger().Error("write top 10 time cost sql info fail", zap.Error(err)) - } - costBuf := record.getTop10SqlInfoByCostTime(pinfo) - if _, err := f.WriteString(costBuf.String()); err != nil { - logutil.BgLogger().Error("write top 10 time cost sql info fail", zap.Error(err)) - } -} - -func (record *memoryUsageAlarm) getTop10SqlInfo(cmp func(i, j *util.ProcessInfo) bool, pinfo []*util.ProcessInfo) strings.Builder { - slices.SortFunc(pinfo, cmp) - list := pinfo - var buf strings.Builder - oomAction := variable.OOMAction.Load() - serverMemoryLimit := memory.ServerMemoryLimit.Load() - for i, totalCnt := 0, 10; i < len(list) && totalCnt > 0; i++ { - info := list[i] - buf.WriteString(fmt.Sprintf("SQL %v: \n", i)) - fields := util.GenLogFields(record.lastCheckTime.Sub(info.Time), info, false) - if fields == nil { - continue - } - fields = append(fields, zap.String("tidb_mem_oom_action", oomAction)) - fields = append(fields, zap.Uint64("tidb_server_memory_limit", serverMemoryLimit)) - fields = append(fields, zap.Int64("tidb_mem_quota_query", info.OOMAlarmVariablesInfo.SessionMemQuotaQuery)) - fields = append(fields, zap.Int("tidb_analyze_version", info.OOMAlarmVariablesInfo.SessionAnalyzeVersion)) - fields = append(fields, zap.Bool("tidb_enable_rate_limit_action", info.OOMAlarmVariablesInfo.SessionEnabledRateLimitAction)) - fields = append(fields, zap.String("current_analyze_plan", getPlanString(info))) - for _, field := range fields { - switch field.Type { - case zapcore.StringType: - buf.WriteString(fmt.Sprintf("%v: %v", field.Key, field.String)) - case zapcore.Uint8Type, zapcore.Uint16Type, zapcore.Uint32Type, zapcore.Uint64Type: - buf.WriteString(fmt.Sprintf("%v: %v", field.Key, uint64(field.Integer))) - case zapcore.Int8Type, zapcore.Int16Type, zapcore.Int32Type, zapcore.Int64Type: - buf.WriteString(fmt.Sprintf("%v: %v", field.Key, field.Integer)) - case zapcore.BoolType: - buf.WriteString(fmt.Sprintf("%v: %v", field.Key, field.Integer == 1)) - } - buf.WriteString("\n") - } - totalCnt-- - } - buf.WriteString("\n") - return buf -} - -func (record *memoryUsageAlarm) getTop10SqlInfoByMemoryUsage(pinfo []*util.ProcessInfo) strings.Builder { - return record.getTop10SqlInfo(func(i, j *util.ProcessInfo) bool { - return i.MemTracker.MaxConsumed() > j.MemTracker.MaxConsumed() - }, pinfo) -} - -func (record *memoryUsageAlarm) getTop10SqlInfoByCostTime(pinfo []*util.ProcessInfo) strings.Builder { - return record.getTop10SqlInfo(func(i, j *util.ProcessInfo) bool { - return i.Time.Before(j.Time) - }, pinfo) -} - -func (record *memoryUsageAlarm) recordSQL(sm util.SessionManager, recordDir string) error { - processInfo := sm.ShowProcessList() - pinfo := make([]*util.ProcessInfo, 0, len(processInfo)) - for _, info := range processInfo { - if len(info.Info) != 0 { - pinfo = append(pinfo, info) - } - } - fileName := filepath.Join(recordDir, "running_sql") - f, err := os.Create(fileName) - if err != nil { - logutil.BgLogger().Error("create oom record file fail", zap.Error(err)) - return err - } - defer func() { - err := f.Close() - if err != nil { - logutil.BgLogger().Error("close oom record file fail", zap.Error(err)) - } - }() - record.printTop10SqlInfo(pinfo, f) - return nil -} - -type item struct { - Name string - Debug int -} - -func (*memoryUsageAlarm) recordProfile(recordDir string) error { - items := []item{ - {Name: "heap"}, - {Name: "goroutine", Debug: 2}, - } - for _, item := range items { - if err := write(item, recordDir); err != nil { - return err - } - } - return nil -} - -func write(item item, recordDir string) error { - fileName := filepath.Join(recordDir, item.Name) - f, err := os.Create(fileName) - if err != nil { - logutil.BgLogger().Error(fmt.Sprintf("create %v profile file fail", item.Name), zap.Error(err)) - return err - } - p := rpprof.Lookup(item.Name) - err = p.WriteTo(f, item.Debug) - if err != nil { - logutil.BgLogger().Error(fmt.Sprintf("write %v profile file fail", item.Name), zap.Error(err)) - return err - } - - //nolint: revive - defer func() { - err := f.Close() - if err != nil { - logutil.BgLogger().Error(fmt.Sprintf("close %v profile file fail", item.Name), zap.Error(err)) - } - }() - return nil -} diff --git a/util/memoryusagealarm/memoryusagealarm_test.go b/util/memoryusagealarm/memoryusagealarm_test.go deleted file mode 100644 index 6e5147805676f..0000000000000 --- a/util/memoryusagealarm/memoryusagealarm_test.go +++ /dev/null @@ -1,145 +0,0 @@ -// Copyright 2019 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package memoryusagealarm - -import ( - "testing" - "time" - - "github.com/pingcap/tidb/sessionctx/stmtctx" - "github.com/pingcap/tidb/sessionctx/variable" - "github.com/pingcap/tidb/util" - "github.com/pingcap/tidb/util/memory" - "github.com/stretchr/testify/assert" -) - -func TestIfNeedDoRecord(t *testing.T) { - record := memoryUsageAlarm{} - record.initMemoryUsageAlarmRecord() - - // mem usage ratio < 70% will not be recorded - memUsed := 0.69 * float64(record.serverMemoryLimit) - needRecord, reason := record.needRecord(uint64(memUsed)) - assert.False(t, needRecord) - assert.Equal(t, NoReason, reason) - - // mem usage ratio > 70% will not be recorded - memUsed = 0.71 * float64(record.serverMemoryLimit) - needRecord, reason = record.needRecord(uint64(memUsed)) - assert.True(t, needRecord) - assert.Equal(t, ExceedAlarmRatio, reason) - record.lastCheckTime = time.Now() - record.lastRecordMemUsed = uint64(memUsed) - - // check time - last record time < 60s will not be recorded - memUsed = 0.71 * float64(record.serverMemoryLimit) - needRecord, reason = record.needRecord(uint64(memUsed)) - assert.False(t, needRecord) - assert.Equal(t, NoReason, reason) - - // check time - last record time > 60s will be recorded - record.lastCheckTime = record.lastCheckTime.Add(-60 * time.Second) - memUsed = 0.71 * float64(record.serverMemoryLimit) - needRecord, reason = record.needRecord(uint64(memUsed)) - assert.True(t, needRecord) - assert.Equal(t, ExceedAlarmRatio, reason) - record.lastCheckTime = time.Now() - record.lastRecordMemUsed = uint64(memUsed) - - // mem usage ratio - last mem usage ratio < 10% will not be recorded - memUsed = 0.80 * float64(record.serverMemoryLimit) - needRecord, reason = record.needRecord(uint64(memUsed)) - assert.False(t, needRecord) - assert.Equal(t, NoReason, reason) - - // mem usage ratio - last mem usage ratio > 10% will not be recorded even though check time - last record time - memUsed = 0.82 * float64(record.serverMemoryLimit) - needRecord, reason = record.needRecord(uint64(memUsed)) - assert.True(t, needRecord) - assert.Equal(t, GrowTooFast, reason) -} - -func genTime(sec int64) time.Time { - minStartTime := time.Date(1970, 1, 0, 0, 0, 0, 0, time.UTC).Unix() - return time.Unix(minStartTime+sec, 0) -} - -func TestGetTop10Sql(t *testing.T) { - record := memoryUsageAlarm{} - record.initMemoryUsageAlarmRecord() - record.lastCheckTime = genTime(123456) - - processInfoList := genMockProcessInfoList([]int64{1000, 87263523, 34223}, - []time.Time{genTime(1234), genTime(123456), genTime(12)}, - 3) - actual := record.getTop10SqlInfoByMemoryUsage(processInfoList) - assert.Equal(t, "SQL 0: \ncost_time: 0s\ntxn_start_ts: 0\nmem_max: 87263523 Bytes (83.2 MB)\nsql: \ntidb_mem_oom_action: CANCEL\ntidb_server_memory_limit: 0\ntidb_mem_quota_query: 0\ntidb_analyze_version: 0\ntidb_enable_rate_limit_action: false\ncurrent_analyze_plan: |id|estRows|task|access object|operator info|\nSQL 1: \ncost_time: 123444s\ntxn_start_ts: 0\nmem_max: 34223 Bytes (33.4 KB)\nsql: \ntidb_mem_oom_action: CANCEL\ntidb_server_memory_limit: 0\ntidb_mem_quota_query: 0\ntidb_analyze_version: 0\ntidb_enable_rate_limit_action: false\ncurrent_analyze_plan: |id|estRows|task|access object|operator info|\nSQL 2: \ncost_time: 122222s\ntxn_start_ts: 0\nmem_max: 1000 Bytes (1000 Bytes)\nsql: \ntidb_mem_oom_action: CANCEL\ntidb_server_memory_limit: 0\ntidb_mem_quota_query: 0\ntidb_analyze_version: 0\ntidb_enable_rate_limit_action: false\ncurrent_analyze_plan: |id|estRows|task|access object|operator info|\n\n", - actual.String()) - actual = record.getTop10SqlInfoByCostTime(processInfoList) - assert.Equal(t, "SQL 0: \ncost_time: 123444s\ntxn_start_ts: 0\nmem_max: 34223 Bytes (33.4 KB)\nsql: \ntidb_mem_oom_action: CANCEL\ntidb_server_memory_limit: 0\ntidb_mem_quota_query: 0\ntidb_analyze_version: 0\ntidb_enable_rate_limit_action: false\ncurrent_analyze_plan: |id|estRows|task|access object|operator info|\nSQL 1: \ncost_time: 122222s\ntxn_start_ts: 0\nmem_max: 1000 Bytes (1000 Bytes)\nsql: \ntidb_mem_oom_action: CANCEL\ntidb_server_memory_limit: 0\ntidb_mem_quota_query: 0\ntidb_analyze_version: 0\ntidb_enable_rate_limit_action: false\ncurrent_analyze_plan: |id|estRows|task|access object|operator info|\nSQL 2: \ncost_time: 0s\ntxn_start_ts: 0\nmem_max: 87263523 Bytes (83.2 MB)\nsql: \ntidb_mem_oom_action: CANCEL\ntidb_server_memory_limit: 0\ntidb_mem_quota_query: 0\ntidb_analyze_version: 0\ntidb_enable_rate_limit_action: false\ncurrent_analyze_plan: |id|estRows|task|access object|operator info|\n\n", actual.String()) - - processInfoList = genMockProcessInfoList([]int64{1000, 87263523, 34223, 532355, 123225151, 231231515, 12312, 12515134234, 232, 12414, 15263236, 123123123, 15}, - []time.Time{genTime(1234), genTime(123456), genTime(12), genTime(3241), genTime(12515), genTime(3215), genTime(61314), genTime(12234), genTime(1123), genTime(512), genTime(11111), genTime(22222), genTime(5512)}, - 13) - actual = record.getTop10SqlInfoByMemoryUsage(processInfoList) - assert.Equal(t, "SQL 0: \ncost_time: 111222s\ntxn_start_ts: 0\nmem_max: 12515134234 Bytes (11.7 GB)\nsql: \ntidb_mem_oom_action: CANCEL\ntidb_server_memory_limit: 0\ntidb_mem_quota_query: 0\ntidb_analyze_version: 0\ntidb_enable_rate_limit_action: false\ncurrent_analyze_plan: |id|estRows|task|access object|operator info|\nSQL 1: \ncost_time: 120241s\ntxn_start_ts: 0\nmem_max: 231231515 Bytes (220.5 MB)\nsql: \ntidb_mem_oom_action: CANCEL\ntidb_server_memory_limit: 0\ntidb_mem_quota_query: 0\ntidb_analyze_version: 0\ntidb_enable_rate_limit_action: false\ncurrent_analyze_plan: |id|estRows|task|access object|operator info|\nSQL 2: \ncost_time: 110941s\ntxn_start_ts: 0\nmem_max: 123225151 Bytes (117.5 MB)\nsql: \ntidb_mem_oom_action: CANCEL\ntidb_server_memory_limit: 0\ntidb_mem_quota_query: 0\ntidb_analyze_version: 0\ntidb_enable_rate_limit_action: false\ncurrent_analyze_plan: |id|estRows|task|access object|operator info|\nSQL 3: \ncost_time: 101234s\ntxn_start_ts: 0\nmem_max: 123123123 Bytes (117.4 MB)\nsql: \ntidb_mem_oom_action: CANCEL\ntidb_server_memory_limit: 0\ntidb_mem_quota_query: 0\ntidb_analyze_version: 0\ntidb_enable_rate_limit_action: false\ncurrent_analyze_plan: |id|estRows|task|access object|operator info|\nSQL 4: \ncost_time: 0s\ntxn_start_ts: 0\nmem_max: 87263523 Bytes (83.2 MB)\nsql: \ntidb_mem_oom_action: CANCEL\ntidb_server_memory_limit: 0\ntidb_mem_quota_query: 0\ntidb_analyze_version: 0\ntidb_enable_rate_limit_action: false\ncurrent_analyze_plan: |id|estRows|task|access object|operator info|\nSQL 5: \ncost_time: 112345s\ntxn_start_ts: 0\nmem_max: 15263236 Bytes (14.6 MB)\nsql: \ntidb_mem_oom_action: CANCEL\ntidb_server_memory_limit: 0\ntidb_mem_quota_query: 0\ntidb_analyze_version: 0\ntidb_enable_rate_limit_action: false\ncurrent_analyze_plan: |id|estRows|task|access object|operator info|\nSQL 6: \ncost_time: 120215s\ntxn_start_ts: 0\nmem_max: 532355 Bytes (519.9 KB)\nsql: \ntidb_mem_oom_action: CANCEL\ntidb_server_memory_limit: 0\ntidb_mem_quota_query: 0\ntidb_analyze_version: 0\ntidb_enable_rate_limit_action: false\ncurrent_analyze_plan: |id|estRows|task|access object|operator info|\nSQL 7: \ncost_time: 123444s\ntxn_start_ts: 0\nmem_max: 34223 Bytes (33.4 KB)\nsql: \ntidb_mem_oom_action: CANCEL\ntidb_server_memory_limit: 0\ntidb_mem_quota_query: 0\ntidb_analyze_version: 0\ntidb_enable_rate_limit_action: false\ncurrent_analyze_plan: |id|estRows|task|access object|operator info|\nSQL 8: \ncost_time: 122944s\ntxn_start_ts: 0\nmem_max: 12414 Bytes (12.1 KB)\nsql: \ntidb_mem_oom_action: CANCEL\ntidb_server_memory_limit: 0\ntidb_mem_quota_query: 0\ntidb_analyze_version: 0\ntidb_enable_rate_limit_action: false\ncurrent_analyze_plan: |id|estRows|task|access object|operator info|\nSQL 9: \ncost_time: 62142s\ntxn_start_ts: 0\nmem_max: 12312 Bytes (12.0 KB)\nsql: \ntidb_mem_oom_action: CANCEL\ntidb_server_memory_limit: 0\ntidb_mem_quota_query: 0\ntidb_analyze_version: 0\ntidb_enable_rate_limit_action: false\ncurrent_analyze_plan: |id|estRows|task|access object|operator info|\n\n", actual.String()) - actual = record.getTop10SqlInfoByCostTime(processInfoList) - assert.Equal(t, "SQL 0: \ncost_time: 123444s\ntxn_start_ts: 0\nmem_max: 34223 Bytes (33.4 KB)\nsql: \ntidb_mem_oom_action: CANCEL\ntidb_server_memory_limit: 0\ntidb_mem_quota_query: 0\ntidb_analyze_version: 0\ntidb_enable_rate_limit_action: false\ncurrent_analyze_plan: |id|estRows|task|access object|operator info|\nSQL 1: \ncost_time: 122944s\ntxn_start_ts: 0\nmem_max: 12414 Bytes (12.1 KB)\nsql: \ntidb_mem_oom_action: CANCEL\ntidb_server_memory_limit: 0\ntidb_mem_quota_query: 0\ntidb_analyze_version: 0\ntidb_enable_rate_limit_action: false\ncurrent_analyze_plan: |id|estRows|task|access object|operator info|\nSQL 2: \ncost_time: 122333s\ntxn_start_ts: 0\nmem_max: 232 Bytes (232 Bytes)\nsql: \ntidb_mem_oom_action: CANCEL\ntidb_server_memory_limit: 0\ntidb_mem_quota_query: 0\ntidb_analyze_version: 0\ntidb_enable_rate_limit_action: false\ncurrent_analyze_plan: |id|estRows|task|access object|operator info|\nSQL 3: \ncost_time: 122222s\ntxn_start_ts: 0\nmem_max: 1000 Bytes (1000 Bytes)\nsql: \ntidb_mem_oom_action: CANCEL\ntidb_server_memory_limit: 0\ntidb_mem_quota_query: 0\ntidb_analyze_version: 0\ntidb_enable_rate_limit_action: false\ncurrent_analyze_plan: |id|estRows|task|access object|operator info|\nSQL 4: \ncost_time: 120241s\ntxn_start_ts: 0\nmem_max: 231231515 Bytes (220.5 MB)\nsql: \ntidb_mem_oom_action: CANCEL\ntidb_server_memory_limit: 0\ntidb_mem_quota_query: 0\ntidb_analyze_version: 0\ntidb_enable_rate_limit_action: false\ncurrent_analyze_plan: |id|estRows|task|access object|operator info|\nSQL 5: \ncost_time: 120215s\ntxn_start_ts: 0\nmem_max: 532355 Bytes (519.9 KB)\nsql: \ntidb_mem_oom_action: CANCEL\ntidb_server_memory_limit: 0\ntidb_mem_quota_query: 0\ntidb_analyze_version: 0\ntidb_enable_rate_limit_action: false\ncurrent_analyze_plan: |id|estRows|task|access object|operator info|\nSQL 6: \ncost_time: 117944s\ntxn_start_ts: 0\nmem_max: 15 Bytes (15 Bytes)\nsql: \ntidb_mem_oom_action: CANCEL\ntidb_server_memory_limit: 0\ntidb_mem_quota_query: 0\ntidb_analyze_version: 0\ntidb_enable_rate_limit_action: false\ncurrent_analyze_plan: |id|estRows|task|access object|operator info|\nSQL 7: \ncost_time: 112345s\ntxn_start_ts: 0\nmem_max: 15263236 Bytes (14.6 MB)\nsql: \ntidb_mem_oom_action: CANCEL\ntidb_server_memory_limit: 0\ntidb_mem_quota_query: 0\ntidb_analyze_version: 0\ntidb_enable_rate_limit_action: false\ncurrent_analyze_plan: |id|estRows|task|access object|operator info|\nSQL 8: \ncost_time: 111222s\ntxn_start_ts: 0\nmem_max: 12515134234 Bytes (11.7 GB)\nsql: \ntidb_mem_oom_action: CANCEL\ntidb_server_memory_limit: 0\ntidb_mem_quota_query: 0\ntidb_analyze_version: 0\ntidb_enable_rate_limit_action: false\ncurrent_analyze_plan: |id|estRows|task|access object|operator info|\nSQL 9: \ncost_time: 110941s\ntxn_start_ts: 0\nmem_max: 123225151 Bytes (117.5 MB)\nsql: \ntidb_mem_oom_action: CANCEL\ntidb_server_memory_limit: 0\ntidb_mem_quota_query: 0\ntidb_analyze_version: 0\ntidb_enable_rate_limit_action: false\ncurrent_analyze_plan: |id|estRows|task|access object|operator info|\n\n", actual.String()) -} - -func genMockProcessInfoList(memConsumeList []int64, startTimeList []time.Time, size int) []*util.ProcessInfo { - processInfoList := make([]*util.ProcessInfo, 0, size) - for i := 0; i < size; i++ { - tracker := memory.NewTracker(0, 0) - tracker.Consume(memConsumeList[i]) - var stmtCtxRefCount stmtctx.ReferenceCount = 0 - processInfo := util.ProcessInfo{Time: startTimeList[i], - StmtCtx: &stmtctx.StatementContext{}, - MemTracker: tracker, - StatsInfo: func(interface{}) map[string]uint64 { - return map[string]uint64{} - }, - RefCountOfStmtCtx: &stmtCtxRefCount, - } - processInfoList = append(processInfoList, &processInfo) - } - return processInfoList -} - -func TestUpdateVariables(t *testing.T) { - variable.MemoryUsageAlarmRatio.Store(0.3) - variable.MemoryUsageAlarmKeepRecordNum.Store(3) - memory.ServerMemoryLimit.Store(1024) - - record := memoryUsageAlarm{} - - record.initMemoryUsageAlarmRecord() - assert.Equal(t, 0.3, record.memoryUsageAlarmRatio) - assert.Equal(t, int64(3), record.memoryUsageAlarmKeepRecordNum) - assert.Equal(t, uint64(1024), record.serverMemoryLimit) - variable.MemoryUsageAlarmRatio.Store(0.6) - variable.MemoryUsageAlarmKeepRecordNum.Store(6) - memory.ServerMemoryLimit.Store(2048) - - record.updateVariable() - assert.Equal(t, 0.3, record.memoryUsageAlarmRatio) - assert.Equal(t, int64(3), record.memoryUsageAlarmKeepRecordNum) - assert.Equal(t, uint64(1024), record.serverMemoryLimit) - record.lastUpdateVariableTime = record.lastUpdateVariableTime.Add(-60 * time.Second) - record.updateVariable() - assert.Equal(t, 0.6, record.memoryUsageAlarmRatio) - assert.Equal(t, int64(6), record.memoryUsageAlarmKeepRecordNum) - assert.Equal(t, uint64(2048), record.serverMemoryLimit) -} diff --git a/util/processinfo.go b/util/processinfo.go index 3c0567cf7e399..996136d77619e 100644 --- a/util/processinfo.go +++ b/util/processinfo.go @@ -31,41 +31,21 @@ import ( // ProcessInfo is a struct used for show processlist statement. type ProcessInfo struct { -<<<<<<< HEAD - ID uint64 - User string - Host string - Port string - DB string - Digest string - Plan interface{} - PlanExplainRows [][]string - RuntimeStatsColl *execdetails.RuntimeStatsColl - Time time.Time - Info string - CurTxnStartTS uint64 - StmtCtx *stmtctx.StatementContext - StatsInfo func(interface{}) map[string]uint64 -======= - Time time.Time - Plan interface{} - StmtCtx *stmtctx.StatementContext - RefCountOfStmtCtx *stmtctx.ReferenceCount - MemTracker *memory.Tracker - DiskTracker *disk.Tracker - StatsInfo func(interface{}) map[string]uint64 - RuntimeStatsColl *execdetails.RuntimeStatsColl - DB string - Digest string - Host string - User string - Info string - Port string - PlanExplainRows [][]string - OOMAlarmVariablesInfo OOMAlarmVariablesInfo - ID uint64 - CurTxnStartTS uint64 ->>>>>>> f8a6bde954 (*: add a reference count for StmtCtx (#39368)) + ID uint64 + User string + Host string + Port string + DB string + Digest string + Plan interface{} + PlanExplainRows [][]string + RuntimeStatsColl *execdetails.RuntimeStatsColl + Time time.Time + Info string + CurTxnStartTS uint64 + StmtCtx *stmtctx.StatementContext + RefCountOfStmtCtx *stmtctx.ReferenceCount + StatsInfo func(interface{}) map[string]uint64 // MaxExecutionTime is the timeout for select statement, in milliseconds. // If the query takes too long, kill it. MaxExecutionTime uint64 diff --git a/util/util.go b/util/util.go index d37192db03c78..9b1a9b52840fc 100644 --- a/util/util.go +++ b/util/util.go @@ -71,103 +71,3 @@ func GetJSON(client *http.Client, url string, v interface{}) error { return errors.Trace(json.NewDecoder(resp.Body).Decode(v)) } -<<<<<<< HEAD -======= - -// ChanMap creates a channel which applies the function over the input Channel. -// Hint of Resource Leakage: -// In golang, channel isn't an interface so we must create a goroutine for handling the inputs. -// Hence the input channel must be closed properly or this function may leak a goroutine. -func ChanMap[T, R any](c <-chan T, f func(T) R) <-chan R { - outCh := make(chan R) - go func() { - defer close(outCh) - for item := range c { - outCh <- f(item) - } - }() - return outCh -} - -// Str2Int64Map converts a string to a map[int64]struct{}. -func Str2Int64Map(str string) map[int64]struct{} { - strs := strings.Split(str, ",") - res := make(map[int64]struct{}, len(strs)) - for _, s := range strs { - id, _ := strconv.ParseInt(s, 10, 64) - res[id] = struct{}{} - } - return res -} - -// GenLogFields generate log fields. -func GenLogFields(costTime time.Duration, info *ProcessInfo, needTruncateSQL bool) []zap.Field { - if info.RefCountOfStmtCtx != nil && !info.RefCountOfStmtCtx.TryIncrease() { - return nil - } - defer info.RefCountOfStmtCtx.Decrease() - - logFields := make([]zap.Field, 0, 20) - logFields = append(logFields, zap.String("cost_time", strconv.FormatFloat(costTime.Seconds(), 'f', -1, 64)+"s")) - execDetail := info.StmtCtx.GetExecDetails() - logFields = append(logFields, execDetail.ToZapFields()...) - if copTaskInfo := info.StmtCtx.CopTasksDetails(); copTaskInfo != nil { - logFields = append(logFields, copTaskInfo.ToZapFields()...) - } - if statsInfo := info.StatsInfo(info.Plan); len(statsInfo) > 0 { - var buf strings.Builder - firstComma := false - vStr := "" - for k, v := range statsInfo { - if v == 0 { - vStr = "pseudo" - } else { - vStr = strconv.FormatUint(v, 10) - } - if firstComma { - buf.WriteString("," + k + ":" + vStr) - } else { - buf.WriteString(k + ":" + vStr) - firstComma = true - } - } - logFields = append(logFields, zap.String("stats", buf.String())) - } - if info.ID != 0 { - logFields = append(logFields, zap.Uint64("conn_id", info.ID)) - } - if len(info.User) > 0 { - logFields = append(logFields, zap.String("user", info.User)) - } - if len(info.DB) > 0 { - logFields = append(logFields, zap.String("database", info.DB)) - } - var tableIDs, indexNames string - if len(info.StmtCtx.TableIDs) > 0 { - tableIDs = strings.Replace(fmt.Sprintf("%v", info.StmtCtx.TableIDs), " ", ",", -1) - logFields = append(logFields, zap.String("table_ids", tableIDs)) - } - if len(info.StmtCtx.IndexNames) > 0 { - indexNames = strings.Replace(fmt.Sprintf("%v", info.StmtCtx.IndexNames), " ", ",", -1) - logFields = append(logFields, zap.String("index_names", indexNames)) - } - logFields = append(logFields, zap.Uint64("txn_start_ts", info.CurTxnStartTS)) - if memTracker := info.MemTracker; memTracker != nil { - logFields = append(logFields, zap.String("mem_max", fmt.Sprintf("%d Bytes (%v)", memTracker.MaxConsumed(), memTracker.FormatBytes(memTracker.MaxConsumed())))) - } - - const logSQLLen = 1024 * 8 - var sql string - if len(info.Info) > 0 { - sql = info.Info - if info.RedactSQL { - sql = parser.Normalize(sql) - } - } - if len(sql) > logSQLLen && needTruncateSQL { - sql = fmt.Sprintf("%s len(%d)", sql[:logSQLLen], len(sql)) - } - logFields = append(logFields, zap.String("sql", sql)) - return logFields -} ->>>>>>> f8a6bde954 (*: add a reference count for StmtCtx (#39368)) diff --git a/util/util_test.go b/util/util_test.go deleted file mode 100644 index ca68a55cd8ba6..0000000000000 --- a/util/util_test.go +++ /dev/null @@ -1,74 +0,0 @@ -// Copyright 2022 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package util - -import ( - "testing" - "time" - - "github.com/pingcap/tidb/sessionctx/stmtctx" - "github.com/pingcap/tidb/util/memory" - "github.com/stretchr/testify/assert" -) - -func TestLogFormat(t *testing.T) { - mem := memory.NewTracker(-1, -1) - mem.Consume(1<<30 + 1<<29 + 1<<28 + 1<<27) - mockTooLongQuery := make([]byte, 1024*9) - - var refCount stmtctx.ReferenceCount = 0 - info := &ProcessInfo{ - ID: 233, - User: "PingCAP", - Host: "127.0.0.1", - DB: "Database", - Info: "select * from table where a > 1", - CurTxnStartTS: 23333, - StatsInfo: func(interface{}) map[string]uint64 { - return nil - }, - StmtCtx: &stmtctx.StatementContext{}, - RefCountOfStmtCtx: &refCount, - MemTracker: mem, - RedactSQL: false, - } - costTime := time.Second * 233 - logSQLTruncateLen := 1024 * 8 - logFields := GenLogFields(costTime, info, true) - - assert.Len(t, logFields, 7) - assert.Equal(t, "cost_time", logFields[0].Key) - assert.Equal(t, "233s", logFields[0].String) - assert.Equal(t, "conn_id", logFields[1].Key) - assert.Equal(t, int64(233), logFields[1].Integer) - assert.Equal(t, "user", logFields[2].Key) - assert.Equal(t, "PingCAP", logFields[2].String) - assert.Equal(t, "database", logFields[3].Key) - assert.Equal(t, "Database", logFields[3].String) - assert.Equal(t, "txn_start_ts", logFields[4].Key) - assert.Equal(t, int64(23333), logFields[4].Integer) - assert.Equal(t, "mem_max", logFields[5].Key) - assert.Equal(t, "2013265920 Bytes (1.88 GB)", logFields[5].String) - assert.Equal(t, "sql", logFields[6].Key) - assert.Equal(t, "select * from table where a > 1", logFields[6].String) - - logFields = GenLogFields(costTime, info, true) - assert.Equal(t, "select * from table where a > 1", logFields[6].String) - info.Info = string(mockTooLongQuery) - logFields = GenLogFields(costTime, info, true) - assert.Equal(t, len(logFields[6].String), logSQLTruncateLen+10) - logFields = GenLogFields(costTime, info, false) - assert.Equal(t, len(logFields[6].String), len(mockTooLongQuery)) -}