diff --git a/config/config.go b/config/config.go index 13288f7c7c950..bb2d1b49b2f59 100644 --- a/config/config.go +++ b/config/config.go @@ -539,6 +539,9 @@ func (c *Config) Valid() error { return fmt.Errorf("invalid max log file size=%v which is larger than max=%v", c.Log.File.MaxSize, MaxLogFileSize) } c.OOMAction = strings.ToLower(c.OOMAction) + if c.OOMAction != OOMActionLog && c.OOMAction != OOMActionCancel { + return fmt.Errorf("unsupported OOMAction %v, TiDB only supports [%v, %v]", c.OOMAction, OOMActionLog, OOMActionCancel) + } // lower_case_table_names is allowed to be 0, 1, 2 if c.LowerCaseTableNames < 0 || c.LowerCaseTableNames > 2 { diff --git a/config/config_test.go b/config/config_test.go index 173878c62c0ed..a4cdf1fb9f0b2 100644 --- a/config/config_test.go +++ b/config/config_test.go @@ -211,3 +211,21 @@ func (s *testConfigSuite) TestValid(c *C) { c.Assert(c1.Valid() == nil, Equals, tt.valid) } } + +func (s *testConfigSuite) TestOOMActionValid(c *C) { + c1 := NewConfig() + tests := []struct { + oomAction string + valid bool + }{ + {"log", true}, + {"Log", true}, + {"Cancel", true}, + {"cANceL", true}, + {"quit", false}, + } + for _, tt := range tests { + c1.OOMAction = tt.oomAction + c.Assert(c1.Valid() == nil, Equals, tt.valid) + } +} diff --git a/executor/executor.go b/executor/executor.go index 43017a0e25553..5a26b246f3adc 100644 --- a/executor/executor.go +++ b/executor/executor.go @@ -1303,11 +1303,15 @@ func ResetContextOfStmt(ctx sessionctx.Context, s ast.StmtNode) (err error) { } switch config.GetGlobalConfig().OOMAction { case config.OOMActionCancel: - sc.MemTracker.SetActionOnExceed(&memory.PanicOnExceed{}) + action := &memory.PanicOnExceed{ConnID: ctx.GetSessionVars().ConnectionID} + action.SetLogHook(domain.GetDomain(ctx).ExpensiveQueryHandle().LogOnQueryExceedMemQuota) + sc.MemTracker.SetActionOnExceed(action) case config.OOMActionLog: - sc.MemTracker.SetActionOnExceed(&memory.LogOnExceed{}) + fallthrough default: - sc.MemTracker.SetActionOnExceed(&memory.LogOnExceed{}) + action := &memory.LogOnExceed{ConnID: ctx.GetSessionVars().ConnectionID} + action.SetLogHook(domain.GetDomain(ctx).ExpensiveQueryHandle().LogOnQueryExceedMemQuota) + sc.MemTracker.SetActionOnExceed(action) } if execStmt, ok := s.(*ast.ExecuteStmt); ok { diff --git a/server/server.go b/server/server.go index b62d62fb2588d..f45cb65e6c7a7 100644 --- a/server/server.go +++ b/server/server.go @@ -490,8 +490,9 @@ func (s *Server) ShowProcessList() map[uint64]*util.ProcessInfo { if atomic.LoadInt32(&client.status) == connStatusWaitShutdown { continue } - pi := client.ctx.ShowProcess() - rs[pi.ID] = pi + if pi := client.ctx.ShowProcess(); pi != nil { + rs[pi.ID] = pi + } } s.rwlock.RUnlock() return rs diff --git a/tidb-server/main.go b/tidb-server/main.go index 38aa25018d731..8356c5fc4392f 100644 --- a/tidb-server/main.go +++ b/tidb-server/main.go @@ -516,7 +516,7 @@ func createServer() { svr, err = server.NewServer(cfg, driver) // Both domain and storage have started, so we have to clean them before exiting. terror.MustNil(err, closeDomainAndStorage) - go dom.ExpensiveQueryHandle().Run(svr) + go dom.ExpensiveQueryHandle().SetSessionManager(svr).Run() } func serverShutdown(isgraceful bool) { diff --git a/util/expensivequery/expensivequery.go b/util/expensivequery/expensivequery.go index a8ec9aa45ec88..2689eac6092a8 100644 --- a/util/expensivequery/expensivequery.go +++ b/util/expensivequery/expensivequery.go @@ -32,15 +32,23 @@ import ( // Handle is the handler for expensive query. type Handle struct { exitCh chan struct{} + sm util.SessionManager } // NewExpensiveQueryHandle builds a new expensive query handler. func NewExpensiveQueryHandle(exitCh chan struct{}) *Handle { - return &Handle{exitCh} + return &Handle{exitCh: exitCh} +} + +// SetSessionManager sets the SessionManager which is used to fetching the info +// of all active sessions. +func (eqh *Handle) SetSessionManager(sm util.SessionManager) *Handle { + eqh.sm = sm + return eqh } // Run starts a expensive query checker goroutine at the start time of the server. -func (eqh *Handle) Run(sm util.SessionManager) { +func (eqh *Handle) Run() { threshold := atomic.LoadUint64(&variable.ExpensiveQueryTimeThreshold) curInterval := time.Second * time.Duration(threshold) ticker := time.NewTicker(curInterval / 2) @@ -50,7 +58,7 @@ func (eqh *Handle) Run(sm util.SessionManager) { if log.GetLevel() > zapcore.WarnLevel { continue } - processInfo := sm.ShowProcessList() + processInfo := eqh.sm.ShowProcessList() for _, info := range processInfo { if len(info.Info) == 0 || info.ExceedExpensiveTimeThresh { continue @@ -72,6 +80,18 @@ func (eqh *Handle) Run(sm util.SessionManager) { } } +// LogOnQueryExceedMemQuota prints a log when memory usage of connID is out of memory quota. +func (eqh *Handle) LogOnQueryExceedMemQuota(connID uint64) { + if log.GetLevel() > zapcore.WarnLevel { + return + } + info, ok := eqh.sm.GetProcessInfo(connID) + if !ok { + return + } + logExpensiveQuery(time.Since(info.Time), info) +} + // logExpensiveQuery logs the queries which exceed the time threshold or memory threshold. func logExpensiveQuery(costTime time.Duration, info *util.ProcessInfo) { logFields := make([]zap.Field, 0, 20) diff --git a/util/memory/action.go b/util/memory/action.go index 49f7651a69e8f..e8f79ef7a5b12 100644 --- a/util/memory/action.go +++ b/util/memory/action.go @@ -15,6 +15,7 @@ package memory import ( "context" + "fmt" "sync" "github.com/pingcap/parser/mysql" @@ -29,12 +30,22 @@ type ActionOnExceed interface { // Action will be called when memory usage exceeds memory quota by the // corresponding Tracker. Action(t *Tracker) + // SetLogHook binds a log hook which will be triggered and log an detailed + // message for the out-of-memory sql. + SetLogHook(hook func(uint64)) } // LogOnExceed logs a warning only once when memory usage exceeds memory quota. type LogOnExceed struct { - mutex sync.Mutex // For synchronization. - acted bool + mutex sync.Mutex // For synchronization. + acted bool + ConnID uint64 + logHook func(uint64) +} + +// SetLogHook sets a hook for LogOnExceed. +func (a *LogOnExceed) SetLogHook(hook func(uint64)) { + a.logHook = hook } // Action logs a warning only once when memory usage exceeds memory quota. @@ -43,16 +54,26 @@ func (a *LogOnExceed) Action(t *Tracker) { defer a.mutex.Unlock() if !a.acted { a.acted = true - logutil.Logger(context.Background()).Warn("memory exceeds quota", - zap.Error(errMemExceedThreshold.GenWithStackByArgs(t.label, t.BytesConsumed(), t.bytesLimit, t.String()))) - return + if a.logHook == nil { + logutil.Logger(context.Background()).Warn("memory exceeds quota", + zap.Error(errMemExceedThreshold.GenWithStackByArgs(t.label, t.BytesConsumed(), t.bytesLimit, t.String()))) + return + } + a.logHook(a.ConnID) } } // PanicOnExceed panics when memory usage exceeds memory quota. type PanicOnExceed struct { - mutex sync.Mutex // For synchronization. - acted bool + mutex sync.Mutex // For synchronization. + acted bool + ConnID uint64 + logHook func(uint64) +} + +// SetLogHook sets a hook for PanicOnExceed. +func (a *PanicOnExceed) SetLogHook(hook func(uint64)) { + a.logHook = hook } // Action panics when memory usage exceeds memory quota. @@ -64,7 +85,10 @@ func (a *PanicOnExceed) Action(t *Tracker) { } a.acted = true a.mutex.Unlock() - panic(PanicMemoryExceed + t.String()) + if a.logHook != nil { + a.logHook(a.ConnID) + } + panic(PanicMemoryExceed + fmt.Sprintf("[conn_id=%d]", a.ConnID)) } var ( diff --git a/util/memory/tracker_test.go b/util/memory/tracker_test.go index 11c6be4848c75..bf7ac98ecc506 100644 --- a/util/memory/tracker_test.go +++ b/util/memory/tracker_test.go @@ -98,6 +98,9 @@ type mockAction struct { called bool } +func (a *mockAction) SetLogHook(hook func(uint64)) { +} + func (a *mockAction) Action(t *Tracker) { a.called = true }