diff --git a/config/config.go b/config/config.go index bb1d3daf94309..7b7c47319be30 100644 --- a/config/config.go +++ b/config/config.go @@ -371,22 +371,24 @@ type Status struct { // Performance is the performance section of the config. type Performance struct { - MaxProcs uint `toml:"max-procs" json:"max-procs"` - MaxMemory uint64 `toml:"max-memory" json:"max-memory"` - StatsLease string `toml:"stats-lease" json:"stats-lease"` - StmtCountLimit uint `toml:"stmt-count-limit" json:"stmt-count-limit"` - FeedbackProbability float64 `toml:"feedback-probability" json:"feedback-probability"` - QueryFeedbackLimit uint `toml:"query-feedback-limit" json:"query-feedback-limit"` - PseudoEstimateRatio float64 `toml:"pseudo-estimate-ratio" json:"pseudo-estimate-ratio"` - ForcePriority string `toml:"force-priority" json:"force-priority"` - BindInfoLease string `toml:"bind-info-lease" json:"bind-info-lease"` - TxnTotalSizeLimit uint64 `toml:"txn-total-size-limit" json:"txn-total-size-limit"` - TCPKeepAlive bool `toml:"tcp-keep-alive" json:"tcp-keep-alive"` - CrossJoin bool `toml:"cross-join" json:"cross-join"` - RunAutoAnalyze bool `toml:"run-auto-analyze" json:"run-auto-analyze"` - DistinctAggPushDown bool `toml:"distinct-agg-push-down" json:"agg-push-down-join"` - CommitterConcurrency int `toml:"committer-concurrency" json:"committer-concurrency"` - MaxTxnTTL uint64 `toml:"max-txn-ttl" json:"max-txn-ttl"` + MaxProcs uint `toml:"max-procs" json:"max-procs"` + MaxMemory uint64 `toml:"max-memory" json:"max-memory"` + ServerMemoryQuota uint64 `toml:"server-memory-quota" json:"server-memory-quota"` + MemoryUsageAlarmRatio float64 `toml:"memory-usage-alarm-ratio" json:"memory-usage-alarm-ratio"` + StatsLease string `toml:"stats-lease" json:"stats-lease"` + StmtCountLimit uint `toml:"stmt-count-limit" json:"stmt-count-limit"` + FeedbackProbability float64 `toml:"feedback-probability" json:"feedback-probability"` + QueryFeedbackLimit uint `toml:"query-feedback-limit" json:"query-feedback-limit"` + PseudoEstimateRatio float64 `toml:"pseudo-estimate-ratio" json:"pseudo-estimate-ratio"` + ForcePriority string `toml:"force-priority" json:"force-priority"` + BindInfoLease string `toml:"bind-info-lease" json:"bind-info-lease"` + TxnTotalSizeLimit uint64 `toml:"txn-total-size-limit" json:"txn-total-size-limit"` + TCPKeepAlive bool `toml:"tcp-keep-alive" json:"tcp-keep-alive"` + CrossJoin bool `toml:"cross-join" json:"cross-join"` + RunAutoAnalyze bool `toml:"run-auto-analyze" json:"run-auto-analyze"` + DistinctAggPushDown bool `toml:"distinct-agg-push-down" json:"agg-push-down-join"` + CommitterConcurrency int `toml:"committer-concurrency" json:"committer-concurrency"` + MaxTxnTTL uint64 `toml:"max-txn-ttl" json:"max-txn-ttl"` } // PlanCache is the PlanCache section of the config. @@ -608,21 +610,23 @@ var defaultConf = Config{ RecordQPSbyDB: false, }, Performance: Performance{ - MaxMemory: 0, - TCPKeepAlive: true, - CrossJoin: true, - StatsLease: "3s", - RunAutoAnalyze: true, - StmtCountLimit: 5000, - FeedbackProbability: 0.0, - QueryFeedbackLimit: 512, - PseudoEstimateRatio: 0.8, - ForcePriority: "NO_PRIORITY", - BindInfoLease: "3s", - TxnTotalSizeLimit: DefTxnTotalSizeLimit, - DistinctAggPushDown: false, - CommitterConcurrency: 16, - MaxTxnTTL: 10 * 60 * 1000, // 10min + MaxMemory: 0, + ServerMemoryQuota: 0, + MemoryUsageAlarmRatio: 0.8, + TCPKeepAlive: true, + CrossJoin: true, + StatsLease: "3s", + RunAutoAnalyze: true, + StmtCountLimit: 5000, + FeedbackProbability: 0.0, + QueryFeedbackLimit: 512, + PseudoEstimateRatio: 0.8, + ForcePriority: "NO_PRIORITY", + BindInfoLease: "3s", + TxnTotalSizeLimit: DefTxnTotalSizeLimit, + DistinctAggPushDown: false, + CommitterConcurrency: 16, + MaxTxnTTL: 10 * 60 * 1000, // 10min }, ProxyProtocol: ProxyProtocol{ Networks: "", @@ -862,6 +866,10 @@ func (c *Config) Valid() error { return fmt.Errorf("txn-total-size-limit should be less than %d", 10<<30) } + if c.Performance.MemoryUsageAlarmRatio > 1 || c.Performance.MemoryUsageAlarmRatio < 0 { + return fmt.Errorf("memory-usage-alarm-ratio in [Performance] must be greater than or equal to 0 and less than or equal to 1") + } + if c.StmtSummary.MaxStmtCount <= 0 { return fmt.Errorf("max-stmt-count in [stmt-summary] should be greater than 0") } diff --git a/config/config.toml.example b/config/config.toml.example index 9c527909c6dde..6556eee05ef1f 100644 --- a/config/config.toml.example +++ b/config/config.toml.example @@ -210,6 +210,17 @@ max-procs = 0 # Max memory size to use, 0 use the total usable memory in the machine. max-memory = 0 +# Memory size quota for tidb server, 0 means unlimited +server-memory-quota = 0 + +# The alarm threshold when memory usage of the tidb-server exceeds. The valid value range is greater than or equal to 0 +# and less than or equal to 1. The default value is 0.8. +# If this configuration is set to 0 or 1, it'll disable the alarm. +# Otherwise, related information will be recorded in the directory `tmp-storage-path/record`. +# Note: If the configuration `server-memory-quota` is set and larger than 0, the alarm threshold will be +# `memory-usage-alarm-ratio * server-memory-quota`; otherwise, it'll be `memory-usage-alarm-ratio * system memory size`. +memory-usage-alarm-ratio = 0.8 + # StmtCountLimit limits the max count of statement inside a transaction. stmt-count-limit = 5000 diff --git a/util/disk/tempDir.go b/util/disk/tempDir.go index 3c182c0311f67..1b05d4c47bf1a 100644 --- a/util/disk/tempDir.go +++ b/util/disk/tempDir.go @@ -32,6 +32,11 @@ var ( sf singleflight.Group ) +const ( + lockFile = "_dir.lock" + recordDir = "record" +) + // CheckAndInitTempDir check whether the temp directory is existed. // If not, initializes the temp directory. func CheckAndInitTempDir() (err error) { @@ -64,7 +69,6 @@ func InitializeTempDir() error { return err } } - lockFile := "_dir.lock" tempDirLock, err = fslock.Lock(filepath.Join(tempDir, lockFile)) if err != nil { switch err { @@ -77,17 +81,27 @@ func InitializeTempDir() error { return err } + // Create dir for MemoryUsageAlarmRecord. + _, err = os.Stat(filepath.Join(tempDir, "record")) + if err != nil && !os.IsExist(err) { + err = os.MkdirAll(filepath.Join(tempDir, "record"), 0755) + if err != nil { + return err + } + } + subDirs, err := ioutil.ReadDir(tempDir) if err != nil { return err } // If it exists others files except lock file, creates another goroutine to clean them. - if len(subDirs) > 1 { + if len(subDirs) > 2 { go func() { for _, subDir := range subDirs { // Do not remove the lock file. - if subDir.Name() == lockFile { + switch subDir.Name() { + case lockFile, recordDir: continue } err := os.RemoveAll(filepath.Join(tempDir, subDir.Name())) diff --git a/util/expensivequery/expensivequery.go b/util/expensivequery/expensivequery.go index 8ab115e900ce6..c77a3f99ddc86 100644 --- a/util/expensivequery/expensivequery.go +++ b/util/expensivequery/expensivequery.go @@ -54,6 +54,7 @@ func (eqh *Handle) Run() { ticker := time.NewTicker(tickInterval) defer ticker.Stop() sm := eqh.sm.Load().(util.SessionManager) + record := initMemoryUsageAlarmRecord() for { select { case <-ticker.C: @@ -73,6 +74,9 @@ func (eqh *Handle) Run() { } } threshold = atomic.LoadUint64(&variable.ExpensiveQueryTimeThreshold) + if record.err == nil { + record.alarm4ExcessiveMemUsage(sm) + } case <-eqh.exitCh: return } diff --git a/util/expensivequery/memory_usage_alarm.go b/util/expensivequery/memory_usage_alarm.go new file mode 100644 index 0000000000000..6824bc4a230b3 --- /dev/null +++ b/util/expensivequery/memory_usage_alarm.go @@ -0,0 +1,240 @@ +// Copyright 2020 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package expensivequery + +import ( + "fmt" + "io/ioutil" + "os" + "path/filepath" + "runtime" + rpprof "runtime/pprof" + "sort" + "strings" + "time" + + "github.com/pingcap/errors" + "github.com/pingcap/tidb/config" + "github.com/pingcap/tidb/util" + "github.com/pingcap/tidb/util/disk" + "github.com/pingcap/tidb/util/logutil" + "github.com/pingcap/tidb/util/memory" + "go.uber.org/zap" + "go.uber.org/zap/zapcore" +) + +type memoryUsageAlarm struct { + err error + isServerMemoryQuotaSet bool + serverMemoryQuota uint64 + lastCheckTime time.Time + + tmpDir string + lastLogFileName []string + lastProfileFileName [][]string // heap, goroutine +} + +func initMemoryUsageAlarmRecord() (record *memoryUsageAlarm) { + record = &memoryUsageAlarm{} + if alert := config.GetGlobalConfig().Performance.MemoryUsageAlarmRatio; alert == 0 || alert == 1 { + record.err = errors.New("close memory usage alarm recorder") + return + } + if quota := config.GetGlobalConfig().Performance.ServerMemoryQuota; quota != 0 { + record.serverMemoryQuota = quota + record.isServerMemoryQuotaSet = true + } else { + // TODO: Get the memory info in container directly. + record.serverMemoryQuota, record.err = memory.MemTotal() + if record.err != nil { + logutil.BgLogger().Error("get system total memory fail", zap.Error(record.err)) + return + } + record.isServerMemoryQuotaSet = false + } + record.lastCheckTime = time.Time{} + record.tmpDir = filepath.Join(config.GetGlobalConfig().TempStoragePath, "record") + record.lastProfileFileName = make([][]string, 2) + // Read last records + files, err := ioutil.ReadDir(record.tmpDir) + if err != nil { + return record + } + for _, f := range files { + name := filepath.Join(record.tmpDir, f.Name()) + if strings.Contains(f.Name(), "running_sql") { + record.lastLogFileName = append(record.lastLogFileName, name) + } + if strings.Contains(f.Name(), "heap") { + record.lastProfileFileName[0] = append(record.lastProfileFileName[0], name) + } + if strings.Contains(f.Name(), "goroutine") { + record.lastProfileFileName[1] = append(record.lastProfileFileName[1], name) + } + } + + return record +} + +// If Performance.ServerMemoryQuota is set, use `ServerMemoryQuota * MemoryUsageAlarmRatio` to check oom risk. +// If Performance.ServerMemoryQuota is not set, use `system total memory size * MemoryUsageAlarmRatio` to check oom risk. +func (record *memoryUsageAlarm) alarm4ExcessiveMemUsage(sm util.SessionManager) { + var memoryUsage uint64 + instanceStats := &runtime.MemStats{} + if record.isServerMemoryQuotaSet { + runtime.ReadMemStats(instanceStats) + memoryUsage = instanceStats.HeapAlloc + } else { + memoryUsage, record.err = memory.MemUsed() + if record.err != nil { + logutil.BgLogger().Error("get system memory usage fail", zap.Error(record.err)) + return + } + } + + // TODO: Consider NextGC to record SQLs. + if float64(memoryUsage) > float64(record.serverMemoryQuota)*config.GetGlobalConfig().Performance.MemoryUsageAlarmRatio { + // At least ten seconds between two recordings that memory usage is less than threshold (default 80% system memory). + // If the memory is still exceeded, only records once. + interval := time.Since(record.lastCheckTime) + record.lastCheckTime = time.Now() + if interval > 10*time.Second { + record.doRecord(memoryUsage, sm) + } + } +} + +func (record *memoryUsageAlarm) doRecord(memUsage uint64, sm util.SessionManager) { + logutil.BgLogger().Warn("the TiDB instance now takes a lot of memory, has the risk of OOM", + zap.Bool("is server-momory-quota set", record.isServerMemoryQuotaSet), + zap.Any("memory size", record.serverMemoryQuota), + zap.Any("memory usage", memUsage), + zap.Any("memory-usage-alarm-ratio", config.GetGlobalConfig().Performance.MemoryUsageAlarmRatio), + ) + + if record.err = disk.CheckAndInitTempDir(); record.err != nil { + return + } + record.recordSQLAndSummaryTable(sm) + record.recordProfile() + + tryRemove := func(filename *[]string) { + // Keep the last 5 files + for len(*filename) > 5 { + err := os.Remove((*filename)[0]) + if err != nil { + logutil.BgLogger().Error("remove temp files failed", zap.Error(err)) + return + } + *filename = (*filename)[1:] + } + } + tryRemove(&record.lastLogFileName) + for i := range record.lastProfileFileName { + tryRemove(&record.lastProfileFileName[i]) + } +} + +func (record *memoryUsageAlarm) recordSQLAndSummaryTable(sm util.SessionManager) { + processInfo := sm.ShowProcessList() + pinfo := make([]*util.ProcessInfo, 0, len(processInfo)) + for _, info := range processInfo { + if len(info.Info) != 0 { + pinfo = append(pinfo, info) + } + } + + fileName := filepath.Join(record.tmpDir, "running_sql"+record.lastCheckTime.Format(time.RFC3339)) + record.lastLogFileName = append(record.lastLogFileName, fileName) + f, err := os.Create(fileName) + if err != nil { + logutil.BgLogger().Error("create oom record file fail", zap.Error(err)) + return + } + defer func() { + err := f.Close() + if err != nil { + logutil.BgLogger().Error("close oom record file fail", zap.Error(err)) + } + }() + printTop10 := func(cmp func(i, j int) bool) { + sort.Slice(pinfo, cmp) + list := pinfo + if len(list) > 10 { + list = list[:10] + } + var buf strings.Builder + for i, info := range list { + buf.WriteString(fmt.Sprintf("SQL %v: \n", i)) + fields := genLogFields(record.lastCheckTime.Sub(info.Time), info) + for _, field := range fields { + switch field.Type { + case zapcore.StringType: + buf.WriteString(fmt.Sprintf("%v: %v", field.Key, field.String)) + case zapcore.Uint8Type, zapcore.Uint16Type, zapcore.Uint32Type, zapcore.Uint64Type: + buf.WriteString(fmt.Sprintf("%v: %v", field.Key, uint64(field.Integer))) + case zapcore.Int8Type, zapcore.Int16Type, zapcore.Int32Type, zapcore.Int64Type: + buf.WriteString(fmt.Sprintf("%v: %v", field.Key, field.Integer)) + } + buf.WriteString("\n") + } + } + buf.WriteString("\n") + _, err = f.WriteString(buf.String()) + } + + _, err = f.WriteString("The 10 SQLs with the most memory usage for OOM analysis\n") + printTop10(func(i, j int) bool { + return pinfo[i].StmtCtx.MemTracker.MaxConsumed() > pinfo[j].StmtCtx.MemTracker.MaxConsumed() + }) + + _, err = f.WriteString("The 10 SQLs with the most time usage for OOM analysis\n") + printTop10(func(i, j int) bool { + return pinfo[i].Time.Before(pinfo[j].Time) + }) + + logutil.BgLogger().Info("record SQLs with the most memory usage or time usage", zap.Any("SQLs file path", fileName)) +} + +func (record *memoryUsageAlarm) recordProfile() { + items := []struct { + name string + debug int + }{ + {name: "heap"}, + {name: "goroutine", debug: 2}, + } + for i, item := range items { + fileName := filepath.Join(record.tmpDir, item.name+record.lastCheckTime.Format(time.RFC3339)) + record.lastProfileFileName[i] = append(record.lastProfileFileName[i], fileName) + f, err := os.Create(fileName) + if err != nil { + logutil.BgLogger().Error(fmt.Sprintf("create %v profile file fail", item.name), zap.Error(err)) + return + } + defer func() { + err := f.Close() + if err != nil { + logutil.BgLogger().Error(fmt.Sprintf("close %v profile file fail", item.name), zap.Error(err)) + } + }() + p := rpprof.Lookup(item.name) + err = p.WriteTo(f, item.debug) + if err != nil { + logutil.BgLogger().Error(fmt.Sprintf("write %v profile file fail", item.name), zap.Error(err)) + return + } + logutil.BgLogger().Info(fmt.Sprintf("record %v profile successfully", item.name), zap.Any("Profile file path", fileName)) + } +}