Skip to content

Commit

Permalink
*: support global memory control for tidb (#37794)
Browse files Browse the repository at this point in the history
ref #37816
  • Loading branch information
wshwsh12 authored Sep 26, 2022
1 parent 98c5ad3 commit 4e4169b
Show file tree
Hide file tree
Showing 11 changed files with 400 additions and 32 deletions.
56 changes: 32 additions & 24 deletions domain/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ import (
"github.com/pingcap/tidb/util/engine"
"github.com/pingcap/tidb/util/expensivequery"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/servermemorylimit"
"github.com/pingcap/tidb/util/sqlexec"
"github.com/tikv/client-go/v2/txnkv/transaction"
pd "github.com/tikv/pd/client"
Expand All @@ -87,30 +88,31 @@ func NewMockDomain() *Domain {
// Domain represents a storage space. Different domains can use the same database name.
// Multiple domains can be used in parallel without synchronization.
type Domain struct {
store kv.Storage
infoCache *infoschema.InfoCache
privHandle *privileges.Handle
bindHandle *bindinfo.BindHandle
statsHandle unsafe.Pointer
statsLease time.Duration
ddl ddl.DDL
info *infosync.InfoSyncer
globalCfgSyncer *globalconfigsync.GlobalConfigSyncer
m sync.Mutex
SchemaValidator SchemaValidator
sysSessionPool *sessionPool
exit chan struct{}
etcdClient *clientv3.Client
sysVarCache sysVarCache // replaces GlobalVariableCache
slowQuery *topNSlowQueries
expensiveQueryHandle *expensivequery.Handle
wg util.WaitGroupWrapper
statsUpdating atomicutil.Int32
cancel context.CancelFunc
indexUsageSyncLease time.Duration
dumpFileGcChecker *dumpFileGcChecker
expiredTimeStamp4PC types.Time
logBackupAdvancer *daemon.OwnerDaemon
store kv.Storage
infoCache *infoschema.InfoCache
privHandle *privileges.Handle
bindHandle *bindinfo.BindHandle
statsHandle unsafe.Pointer
statsLease time.Duration
ddl ddl.DDL
info *infosync.InfoSyncer
globalCfgSyncer *globalconfigsync.GlobalConfigSyncer
m sync.Mutex
SchemaValidator SchemaValidator
sysSessionPool *sessionPool
exit chan struct{}
etcdClient *clientv3.Client
sysVarCache sysVarCache // replaces GlobalVariableCache
slowQuery *topNSlowQueries
expensiveQueryHandle *expensivequery.Handle
serverMemoryLimitHandle *servermemorylimit.Handle
wg util.WaitGroupWrapper
statsUpdating atomicutil.Int32
cancel context.CancelFunc
indexUsageSyncLease time.Duration
dumpFileGcChecker *dumpFileGcChecker
expiredTimeStamp4PC types.Time
logBackupAdvancer *daemon.OwnerDaemon

serverID uint64
serverIDSession *concurrency.Session
Expand Down Expand Up @@ -885,6 +887,7 @@ func NewDomain(store kv.Storage, ddlLease time.Duration, statsLease time.Duratio

do.SchemaValidator = NewSchemaValidator(ddlLease, do)
do.expensiveQueryHandle = expensivequery.NewExpensiveQueryHandle(do.exit)
do.serverMemoryLimitHandle = servermemorylimit.NewServerMemoryLimitHandle(do.exit)
do.sysProcesses = SysProcesses{mu: &sync.RWMutex{}, procMap: make(map[uint64]sessionctx.Context)}
variable.SetStatsCacheCapacity.Store(do.SetStatsCacheCapacity)
return do
Expand Down Expand Up @@ -1818,6 +1821,11 @@ func (do *Domain) ExpensiveQueryHandle() *expensivequery.Handle {
return do.expensiveQueryHandle
}

// ServerMemoryLimitHandle returns the expensive query handle.
func (do *Domain) ServerMemoryLimitHandle() *servermemorylimit.Handle {
return do.serverMemoryLimitHandle
}

const (
privilegeKey = "/tidb/privilege"
sysVarCacheKey = "/tidb/sysvars"
Expand Down
1 change: 1 addition & 0 deletions executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1943,6 +1943,7 @@ func ResetContextOfStmt(ctx sessionctx.Context, s ast.StmtNode) (err error) {
} else {
sc.InitMemTracker(memory.LabelForSQLText, vars.MemQuotaQuery)
sc.MemTracker.AttachToGlobalTracker(GlobalMemoryUsageTracker)
sc.MemTracker.IsRootTrackerOfSess, sc.MemTracker.SessionID = true, vars.ConnectionID
}

sc.InitDiskTracker(memory.LabelForSQLText, -1)
Expand Down
63 changes: 63 additions & 0 deletions executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6063,3 +6063,66 @@ func TestTableLockPrivilege(t *testing.T) {
tk2.MustExec("LOCK TABLE test.t WRITE, test2.t2 WRITE")
tk.MustExec("LOCK TABLE test.t WRITE, test2.t2 WRITE")
}

func TestGlobalMemoryControl(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)

tk0 := testkit.NewTestKit(t, store)
tk0.MustExec("set global tidb_mem_oom_action = 'cancel'")
tk0.MustExec("set global tidb_server_memory_limit = 512 << 20")
tk0.MustExec("set global tidb_server_memory_limit_sess_min_size = 128")

tk1 := testkit.NewTestKit(t, store)
tracker1 := tk1.Session().GetSessionVars().StmtCtx.MemTracker

tk2 := testkit.NewTestKit(t, store)
tracker2 := tk2.Session().GetSessionVars().StmtCtx.MemTracker

tk3 := testkit.NewTestKit(t, store)
tracker3 := tk3.Session().GetSessionVars().StmtCtx.MemTracker

sm := &testkit.MockSessionManager{
PS: []*util.ProcessInfo{tk1.Session().ShowProcess(), tk2.Session().ShowProcess(), tk3.Session().ShowProcess()},
}
dom.ServerMemoryLimitHandle().SetSessionManager(sm)
go dom.ServerMemoryLimitHandle().Run()

tracker1.Consume(100 << 20) // 100 MB
tracker2.Consume(200 << 20) // 200 MB
tracker3.Consume(300 << 20) // 300 MB

test := make([]int, 128<<20) // Keep 1GB HeapInUse
time.Sleep(500 * time.Millisecond) // The check goroutine checks the memory usage every 100ms. The Sleep() make sure that Top1Tracker can be Canceled.

// Kill Top1
require.False(t, tracker1.NeedKill.Load())
require.False(t, tracker2.NeedKill.Load())
require.True(t, tracker3.NeedKill.Load())
require.Equal(t, memory.MemUsageTop1Tracker.Load(), tracker3)
util.WithRecovery( // Next Consume() will panic and cancel the SQL
func() {
tracker3.Consume(1)
}, func(r interface{}) {
require.True(t, strings.Contains(r.(string), "Out Of Memory Quota!"))
})
tracker2.Consume(300 << 20) // Sum 500MB, Not Panic, Waiting t3 cancel finish.
time.Sleep(500 * time.Millisecond)
require.False(t, tracker2.NeedKill.Load())
// Kill Finished
tracker3.Consume(-(300 << 20))
// Simulated SQL is Canceled and the time is updated
sm.PSMu.Lock()
ps := *sm.PS[2]
ps.Time = time.Now()
sm.PS[2] = &ps
sm.PSMu.Unlock()
time.Sleep(500 * time.Millisecond)
// Kill the Next SQL
util.WithRecovery( // Next Consume() will panic and cancel the SQL
func() {
tracker2.Consume(1)
}, func(r interface{}) {
require.True(t, strings.Contains(r.(string), "Out Of Memory Quota!"))
})
require.Equal(t, test[0], 0) // Keep 1GB HeapInUse
}
48 changes: 48 additions & 0 deletions sessionctx/variable/sysvar.go
Original file line number Diff line number Diff line change
Expand Up @@ -716,6 +716,54 @@ var defaultSysVars = []*SysVar{
return nil
},
},
{Scope: ScopeGlobal, Name: TiDBServerMemoryLimit, Value: strconv.FormatUint(DefTiDBServerMemoryLimit, 10), Type: TypeUnsigned, MinValue: 0, MaxValue: math.MaxUint64,
GetGlobal: func(s *SessionVars) (string, error) {
return memory.ServerMemoryLimit.String(), nil
},
Validation: func(s *SessionVars, normalizedValue string, originalValue string, scope ScopeFlag) (string, error) {
intVal, err := strconv.ParseUint(normalizedValue, 10, 64)
if err != nil {
return "", err
}
if intVal > 0 && intVal < (512<<20) { // 512 MB
s.StmtCtx.AppendWarning(ErrTruncatedWrongValue.GenWithStackByArgs(TiDBServerMemoryLimit, originalValue))
intVal = 512 << 20
}
return strconv.FormatUint(intVal, 10), nil
},
SetGlobal: func(s *SessionVars, val string) error {
intVal, err := strconv.ParseUint(val, 10, 64)
if err != nil {
return err
}
memory.ServerMemoryLimit.Store(intVal)
return nil
},
},
{Scope: ScopeGlobal, Name: TiDBServerMemoryLimitSessMinSize, Value: strconv.FormatUint(DefTiDBServerMemoryLimitSessMinSize, 10), Type: TypeUnsigned, MinValue: 0, MaxValue: math.MaxUint64,
GetGlobal: func(s *SessionVars) (string, error) {
return memory.ServerMemoryLimitSessMinSize.String(), nil
},
Validation: func(s *SessionVars, normalizedValue string, originalValue string, scope ScopeFlag) (string, error) {
intVal, err := strconv.ParseUint(normalizedValue, 10, 64)
if err != nil {
return "", err
}
if intVal > 0 && intVal < 128 { // 128 Bytes
s.StmtCtx.AppendWarning(ErrTruncatedWrongValue.GenWithStackByArgs(TiDBServerMemoryLimitSessMinSize, originalValue))
intVal = 128
}
return strconv.FormatUint(intVal, 10), nil
},
SetGlobal: func(s *SessionVars, val string) error {
intVal, err := strconv.ParseUint(val, 10, 64)
if err != nil {
return err
}
memory.ServerMemoryLimitSessMinSize.Store(intVal)
return nil
},
},
{Scope: ScopeGlobal, Name: TiDBEnableColumnTracking, Value: BoolToOnOff(DefTiDBEnableColumnTracking), Type: TypeBool, GetGlobal: func(s *SessionVars) (string, error) {
return BoolToOnOff(EnableColumnTracking.Load()), nil
}, SetGlobal: func(s *SessionVars, val string) error {
Expand Down
78 changes: 78 additions & 0 deletions sessionctx/variable/sysvar_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package variable
import (
"encoding/json"
"fmt"
"math"
"strconv"
"sync/atomic"
"testing"
Expand Down Expand Up @@ -746,3 +747,80 @@ func TestSetTIDBDiskQuota(t *testing.T) {
require.NoError(t, err)
require.Equal(t, strconv.FormatInt(pb, 10), val)
}

func TestTiDBServerMemoryLimit(t *testing.T) {
vars := NewSessionVars()
mock := NewMockGlobalAccessor4Tests()
mock.SessionVars = vars
vars.GlobalVarsAccessor = mock
var (
mb uint64 = 1 << 20
err error
val string
)
// Test tidb_server_memory_limit
serverMemoryLimit := GetSysVar(TiDBServerMemoryLimit)
// Check default value
require.Equal(t, serverMemoryLimit.Value, strconv.FormatUint(DefTiDBServerMemoryLimit, 10))

// MinValue is 512 MB
err = mock.SetGlobalSysVar(TiDBServerMemoryLimit, strconv.FormatUint(100*mb, 10))
require.NoError(t, err)
val, err = mock.GetGlobalSysVar(TiDBServerMemoryLimit)
require.NoError(t, err)
require.Equal(t, strconv.FormatUint(512*mb, 10), val)

// Test Close
err = mock.SetGlobalSysVar(TiDBServerMemoryLimit, strconv.FormatUint(0, 10))
require.NoError(t, err)
val, err = mock.GetGlobalSysVar(TiDBServerMemoryLimit)
require.NoError(t, err)
require.Equal(t, strconv.FormatUint(0, 10), val)

// Test MaxValue
err = mock.SetGlobalSysVar(TiDBServerMemoryLimit, strconv.FormatUint(math.MaxUint64, 10))
require.NoError(t, err)
val, err = mock.GetGlobalSysVar(TiDBServerMemoryLimit)
require.NoError(t, err)
require.Equal(t, strconv.FormatUint(math.MaxUint64, 10), val)

// Test Normal Value
err = mock.SetGlobalSysVar(TiDBServerMemoryLimit, strconv.FormatUint(1024*mb, 10))
require.NoError(t, err)
val, err = mock.GetGlobalSysVar(TiDBServerMemoryLimit)
require.NoError(t, err)
require.Equal(t, strconv.FormatUint(1024*mb, 10), val)

// Test tidb_server_memory_limit_sess_min_size
serverMemoryLimitSessMinSize := GetSysVar(TiDBServerMemoryLimitSessMinSize)
// Check default value
require.Equal(t, serverMemoryLimitSessMinSize.Value, strconv.FormatUint(DefTiDBServerMemoryLimitSessMinSize, 10))

// MinValue is 128 Bytes
err = mock.SetGlobalSysVar(TiDBServerMemoryLimitSessMinSize, strconv.FormatUint(100, 10))
require.NoError(t, err)
val, err = mock.GetGlobalSysVar(TiDBServerMemoryLimitSessMinSize)
require.NoError(t, err)
require.Equal(t, strconv.FormatUint(128, 10), val)

// Test Close
err = mock.SetGlobalSysVar(TiDBServerMemoryLimitSessMinSize, strconv.FormatUint(0, 10))
require.NoError(t, err)
val, err = mock.GetGlobalSysVar(TiDBServerMemoryLimitSessMinSize)
require.NoError(t, err)
require.Equal(t, strconv.FormatUint(0, 10), val)

// Test MaxValue
err = mock.SetGlobalSysVar(TiDBServerMemoryLimitSessMinSize, strconv.FormatUint(math.MaxUint64, 10))
require.NoError(t, err)
val, err = mock.GetGlobalSysVar(TiDBServerMemoryLimitSessMinSize)
require.NoError(t, err)
require.Equal(t, strconv.FormatUint(math.MaxUint64, 10), val)

// Test Normal Value
err = mock.SetGlobalSysVar(TiDBServerMemoryLimitSessMinSize, strconv.FormatUint(200*mb, 10))
require.NoError(t, err)
val, err = mock.GetGlobalSysVar(TiDBServerMemoryLimitSessMinSize)
require.NoError(t, err)
require.Equal(t, strconv.FormatUint(200*mb, 10), val)
}
11 changes: 11 additions & 0 deletions sessionctx/variable/tidb_vars.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tidb/sessionctx/variable/featuretag/concurrencyddl"
"github.com/pingcap/tidb/util/mathutil"
"github.com/pingcap/tidb/util/memory"
"github.com/pingcap/tidb/util/paging"
"go.uber.org/atomic"
)
Expand Down Expand Up @@ -837,6 +839,10 @@ const (
TiDBDDLEnableFastReorg = "tidb_ddl_enable_fast_reorg"
// TiDBDDLDiskQuota used to set disk quota for lightning add index.
TiDBDDLDiskQuota = "tidb_ddl_disk_quota"
// TiDBServerMemoryLimit indicates the memory limit of the tidb-server instance.
TiDBServerMemoryLimit = "tidb_server_memory_limit"
// TiDBServerMemoryLimitSessMinSize indicates the minimal memory used of a session, that becomes a candidate for session kill.
TiDBServerMemoryLimitSessMinSize = "tidb_server_memory_limit_sess_min_size"
)

// TiDB intentional limits
Expand Down Expand Up @@ -1072,6 +1078,7 @@ const (
DefTiDBForeignKeyChecks = false
DefTiDBOptRangeMaxSize = 0
DefTiDBCostModelVer = 1
DefTiDBServerMemoryLimitSessMinSize = 128 << 20
)

// Process global variables.
Expand Down Expand Up @@ -1123,6 +1130,10 @@ var (
// EnableForeignKey indicates whether to enable foreign key feature.
EnableForeignKey = atomic.NewBool(false)
EnableRCReadCheckTS = atomic.NewBool(false)

// DefTiDBServerMemoryLimit indicates the default value of TiDBServerMemoryLimit(TotalMem * 80%).
// It should be a const and shouldn't be modified after tidb is started.
DefTiDBServerMemoryLimit = mathutil.Max(memory.GetMemTotalIgnoreErr()/10*8, 512<<20)
)

var (
Expand Down
5 changes: 5 additions & 0 deletions testkit/mocksessionmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
// MockSessionManager is a mocked session manager which is used for test.
type MockSessionManager struct {
PS []*util.ProcessInfo
PSMu sync.RWMutex
SerID uint64
TxnInfo []*txninfo.TxnInfo
conn map[uint64]session.Session
Expand All @@ -51,6 +52,8 @@ func (msm *MockSessionManager) ShowTxnList() []*txninfo.TxnInfo {

// ShowProcessList implements the SessionManager.ShowProcessList interface.
func (msm *MockSessionManager) ShowProcessList() map[uint64]*util.ProcessInfo {
msm.PSMu.RLock()
defer msm.PSMu.RUnlock()
ret := make(map[uint64]*util.ProcessInfo)
if len(msm.PS) > 0 {
for _, item := range msm.PS {
Expand All @@ -68,6 +71,8 @@ func (msm *MockSessionManager) ShowProcessList() map[uint64]*util.ProcessInfo {

// GetProcessInfo implements the SessionManager.GetProcessInfo interface.
func (msm *MockSessionManager) GetProcessInfo(id uint64) (*util.ProcessInfo, bool) {
msm.PSMu.RLock()
defer msm.PSMu.RUnlock()
for _, item := range msm.PS {
if item.ID == id {
return item, true
Expand Down
1 change: 1 addition & 0 deletions tidb-server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -746,6 +746,7 @@ func createServer(storage kv.Storage, dom *domain.Domain) *server.Server {
svr.SetDomain(dom)
svr.InitGlobalConnID(dom.ServerID)
go dom.ExpensiveQueryHandle().SetSessionManager(svr).Run()
go dom.ServerMemoryLimitHandle().SetSessionManager(svr).Run()
dom.InfoSyncer().SetSessionManager(svr)
return svr
}
Expand Down
8 changes: 8 additions & 0 deletions util/memory/meminfo.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,14 @@ var MemTotal func() (uint64, error)
// MemUsed returns the total used amount of RAM on this system
var MemUsed func() (uint64, error)

// GetMemTotalIgnoreErr returns the total amount of RAM on this system/container. If error occurs, return 0.
func GetMemTotalIgnoreErr() uint64 {
if memTotal, err := MemTotal(); err == nil {
return memTotal
}
return 0
}

// MemTotalNormal returns the total amount of RAM on this system in non-container environment.
func MemTotalNormal() (uint64, error) {
total, t := memLimit.get()
Expand Down
Loading

0 comments on commit 4e4169b

Please sign in to comment.