Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sysvar, config: introduce tidb_connection_concurrency_limit as an instance scope sysvar. #34662

Closed
wants to merge 19 commits into from
Closed
Show file tree
Hide file tree
Changes from 13 commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
a064adf
sysvar,config: introduce tidb_connection_concurrency_limit
CbcWestwolf May 15, 2022
d33a28b
Merge branch 'master' into change_token_limit
hawkingrei May 15, 2022
7b8a7e4
Merge branch 'master' into change_token_limit
CbcWestwolf May 16, 2022
4c71453
Fix
CbcWestwolf May 17, 2022
58d3385
Merge branch 'change_token_limit' of github.com:CbcWestwolf/tidb into…
CbcWestwolf May 17, 2022
6038f07
Merge branch 'master' of github.com:pingcap/tidb into change_token_limit
CbcWestwolf May 18, 2022
ae1df77
Introduce TokenLimiter.Resize()
CbcWestwolf May 18, 2022
0ad5567
Fix check_dev
CbcWestwolf May 19, 2022
e9ba4a9
Refactor
CbcWestwolf May 19, 2022
f25f0b6
Merge branch 'master' of github.com:pingcap/tidb into change_token_limit
CbcWestwolf May 19, 2022
51c4be7
Fix
CbcWestwolf May 19, 2022
155f981
Fix
CbcWestwolf May 19, 2022
d635b76
Merge branch 'master' into change_token_limit
CbcWestwolf May 22, 2022
1d35815
Merge branch 'master' of github.com:pingcap/tidb into change_token_limit
CbcWestwolf Jun 8, 2022
8d73590
Merge branch 'master' of github.com:pingcap/tidb into change_token_limit
CbcWestwolf Jun 13, 2022
ae1e2b9
Merge branch 'master' into change_token_limit
morgo Jun 18, 2022
0065286
Merge branch 'change_token_limit' of github.com:CbcWestwolf/tidb into…
CbcWestwolf Aug 4, 2022
3a88c94
Merge branch 'master' of github.com:pingcap/tidb into change_token_limit
CbcWestwolf Aug 4, 2022
2985c9a
Merge branch 'master' of github.com:pingcap/tidb into change_token_limit
CbcWestwolf Aug 4, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 13 additions & 1 deletion config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,9 @@ const (
DefExpensiveQueryTimeThreshold = 60
// DefMemoryUsageAlarmRatio is the threshold triggering an alarm which the memory usage of tidb-server instance exceeds.
DefMemoryUsageAlarmRatio = 0.8
// DefConnectionConcurrencyLimit is the number of sessions that can execute requests concurrently.
DefConnectionConcurrencyLimit = 1000
MaxConnectionConcurrencyLimit = 4294967295
)

// Valid config maps
Expand Down Expand Up @@ -117,6 +120,7 @@ var (
"enable-collect-execution-info": "tidb_enable_collect_execution_info",
"plugin.load": "plugin_load",
"plugin.dir": "plugin_dir",
"token-limit": "tidb_connection_concurrency_limit",
},
},
{
Expand Down Expand Up @@ -159,7 +163,7 @@ type Config struct {
Lease string `toml:"lease" json:"lease"`
RunDDL bool `toml:"run-ddl" json:"run-ddl"`
SplitTable bool `toml:"split-table" json:"split-table"`
TokenLimit uint `toml:"token-limit" json:"token-limit"`
TokenLimit uint32 `toml:"token-limit" json:"token-limit"`
OOMUseTmpStorage bool `toml:"oom-use-tmp-storage" json:"oom-use-tmp-storage"`
TempStoragePath string `toml:"tmp-storage-path" json:"tmp-storage-path"`
// TempStorageQuota describe the temporary storage Quota during query exector when OOMUseTmpStorage is enabled
Expand Down Expand Up @@ -464,6 +468,7 @@ type Instance struct {
EnableCollectExecutionInfo bool `toml:"tidb_enable_collect_execution_info" json:"tidb_enable_collect_execution_info"`
PluginDir string `toml:"plugin_dir" json:"plugin_dir"`
PluginLoad string `toml:"plugin_load" json:"plugin_load"`
ConnectionConcurrencyLimit uint32 `toml:"tidb_connection_concurrency_limit" json:"tidb_connection_concurrency_limit"`
}

func (l *Log) getDisableTimestamp() bool {
Expand Down Expand Up @@ -808,6 +813,7 @@ var defaultConf = Config{
EnableCollectExecutionInfo: true,
PluginDir: "/data/deploy/plugin",
PluginLoad: "",
ConnectionConcurrencyLimit: DefConnectionConcurrencyLimit,
},
Status: Status{
ReportStatus: true,
Expand Down Expand Up @@ -1101,6 +1107,9 @@ func (c *Config) Valid() error {
if c.IndexLimit < DefIndexLimit || c.IndexLimit > DefMaxOfIndexLimit {
return fmt.Errorf("index-limit should be [%d, %d]", DefIndexLimit, DefMaxOfIndexLimit)
}
if c.TokenLimit < 1 || c.TokenLimit > MaxConnectionConcurrencyLimit {
return fmt.Errorf("token-limit must be greater than or equal to 1 and less than or equal to %d", MaxConnectionConcurrencyLimit)
}
if c.Log.File.MaxSize > MaxLogFileSize {
return fmt.Errorf("invalid max log file size=%v which is larger than max=%v", c.Log.File.MaxSize, MaxLogFileSize)
}
Expand All @@ -1126,6 +1135,9 @@ func (c *Config) Valid() error {
return fmt.Errorf("tidb_memory_usage_alarm_ratio in [Instance] must be greater than or equal to 0 and less than or equal to 1")
}

if c.Instance.ConnectionConcurrencyLimit < 1 || c.Instance.ConnectionConcurrencyLimit > MaxConnectionConcurrencyLimit {
return fmt.Errorf("tidb_connection_concurrency_limit in [Instance] must be greater than or equal to 1 and less than or equal to %d", MaxConnectionConcurrencyLimit)
}
if len(c.IsolationRead.Engines) < 1 {
return fmt.Errorf("the number of [isolation-read]engines for isolation read should be at least 1")
}
Expand Down
6 changes: 3 additions & 3 deletions config/config.toml.example
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,6 @@ lease = "45s"
# turn off this option if there will be a large number of tables created.
split-table = true

# The limit of concurrent executed sessions.
token-limit = 1000

# Controls whether to enable the temporary storage for some operators when a single SQL statement exceeds the memory quota specified by the memory quota.
oom-use-tmp-storage = true

Expand Down Expand Up @@ -468,3 +465,6 @@ tidb_slow_log_threshold = 300
# tidb_record_plan_in_slow_log is used to enable record query plan in slow log.
# 0 is disable. 1 is enable.
tidb_record_plan_in_slow_log = 1

# The limit of concurrent executed connections.
tidb_connection_concurrency_limit = 1000
13 changes: 8 additions & 5 deletions config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,6 @@ unrecognized-option-test = true
require.NoError(t, err)

_, err = f.WriteString(`
token-limit = 0
enable-table-lock = true
alter-primary-key = true
delay-clean-table-lock = 5
Expand Down Expand Up @@ -279,7 +278,6 @@ grpc-max-send-msg-size = 40960
require.Equal(t, uint(6000), conf.TiKVClient.RegionCacheTTL)
require.Equal(t, int64(0), conf.TiKVClient.StoreLimit)
require.Equal(t, int64(8192), conf.TiKVClient.TTLRefreshedTxnSize)
require.Equal(t, uint(1000), conf.TokenLimit)
require.True(t, conf.EnableTableLock)
require.Equal(t, uint64(5), conf.DelayCleanTableLock)
require.Equal(t, uint64(10000), conf.SplitRegionMaxNum)
Expand Down Expand Up @@ -509,7 +507,10 @@ func TestConflictInstanceConfig(t *testing.T) {
// Just receive a warning and keep their respective values.
expectedConflictOptions := map[string]InstanceConfigSection{
"": {
"", map[string]string{"check-mb4-value-in-utf8": "tidb_check_mb4_value_in_utf8"},
"", map[string]string{
"check-mb4-value-in-utf8": "tidb_check_mb4_value_in_utf8",
"token-limit": "tidb_connection_concurrency_limit",
},
},
"log": {
"log", map[string]string{"enable-slow-log": "tidb_enable_slow_log"},
Expand All @@ -518,17 +519,19 @@ func TestConflictInstanceConfig(t *testing.T) {
"performance", map[string]string{"force-priority": "tidb_force_priority"},
},
}
_, err = f.WriteString("check-mb4-value-in-utf8 = true \n" +
_, err = f.WriteString("check-mb4-value-in-utf8 = true \ntoken-limit = 10 \n" +
"[log] \nenable-slow-log = true \n" +
"[performance] \nforce-priority = \"NO_PRIORITY\"\n" +
"[instance] \ntidb_check_mb4_value_in_utf8 = false \ntidb_enable_slow_log = false \ntidb_force_priority = \"LOW_PRIORITY\"")
"[instance] \ntidb_check_mb4_value_in_utf8 = false \ntidb_connection_concurrency_limit = 100 \ntidb_enable_slow_log = false \ntidb_force_priority = \"LOW_PRIORITY\"")
require.NoError(t, err)
require.NoError(t, f.Sync())
err = conf.Load(configFile)
require.Error(t, err)
require.True(t, strings.Contains(err.Error(), "Conflict configuration options exists on both [instance] section and some other sections."))
require.False(t, conf.Instance.CheckMb4ValueInUTF8.Load())
require.True(t, conf.CheckMb4ValueInUTF8.Load())
require.Equal(t, uint32(10), conf.TokenLimit)
require.Equal(t, uint32(100), conf.Instance.ConnectionConcurrencyLimit)
require.Equal(t, true, conf.Log.EnableSlowLog.Load())
require.Equal(t, false, conf.Instance.EnableSlowLog.Load())
require.Equal(t, "NO_PRIORITY", conf.Performance.ForcePriority)
Expand Down
10 changes: 8 additions & 2 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,11 @@ func (s *Server) releaseToken(token *Token) {
metrics.TokenGauge.Dec()
}

// SetConnectionConcurrencyLimit updates the size of concurrentLimiter.
func (s *Server) SetConnectionConcurrencyLimit(count uint32) {
s.concurrentLimiter.Resize(uint(count))
}

// SetDomain use to set the server domain.
func (s *Server) SetDomain(dom *domain.Domain) {
s.dom = dom
Expand Down Expand Up @@ -193,7 +198,7 @@ func NewServer(cfg *config.Config, driver IDriver) (*Server, error) {
s := &Server{
cfg: cfg,
driver: driver,
concurrentLimiter: NewTokenLimiter(cfg.TokenLimit),
concurrentLimiter: NewTokenLimiter(uint(cfg.Instance.ConnectionConcurrencyLimit)),
Comment on lines -195 to +200
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this will just be passed to server/ when the server starts?

If so, it will need to be refactored, since making changes to the sysvar won't apply until restart (which is a problem for instance vars, since the values are lost on restart too).

clients: make(map[uint64]*clientConn),
globalConnID: util.NewGlobalConnID(0, true),
internalSessions: make(map[interface{}]struct{}, 100),
Expand Down Expand Up @@ -302,6 +307,7 @@ func NewServer(cfg *config.Config, driver IDriver) (*Server, error) {
rand.Seed(time.Now().UTC().UnixNano())

variable.RegisterStatistics(s)
variable.SetConnectionConcurrencyLimit = s.SetConnectionConcurrencyLimit

return s, nil
}
Expand Down Expand Up @@ -351,7 +357,7 @@ func setTxnScope() {

// Export config-related metrics
func (s *Server) reportConfig() {
metrics.ConfigStatus.WithLabelValues("token-limit").Set(float64(s.cfg.TokenLimit))
metrics.ConfigStatus.WithLabelValues("tidb_connection_concurrency_limit").Set(float64(s.cfg.Instance.ConnectionConcurrencyLimit))
metrics.ConfigStatus.WithLabelValues("max-server-connections").Set(float64(s.cfg.MaxServerConnections))
}

Expand Down
24 changes: 24 additions & 0 deletions server/tokenlimiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@

package server

import (
"sync"
)

// Token is used as a permission to keep on running.
type Token struct {
}
Expand All @@ -22,18 +26,38 @@ type Token struct {
type TokenLimiter struct {
count uint
ch chan *Token
mutex sync.Mutex // For synchronization.
}

// Put releases the token.
func (tl *TokenLimiter) Put(tk *Token) {
tl.mutex.Lock()
defer tl.mutex.Unlock()
tl.ch <- tk
}

// Get obtains a token.
func (tl *TokenLimiter) Get() *Token {
tl.mutex.Lock()
defer tl.mutex.Unlock()
return <-tl.ch
}

// Resize adjusts the size of tl.ch
func (tl *TokenLimiter) Resize(count uint) {
if tl.count < count || tl.count > count && tl.count-uint(len(tl.ch)) <= count {
tl.mutex.Lock()
defer tl.mutex.Unlock()

newCh := make(chan *Token, count)
for i := uint(0); i < count-tl.count+uint(len(tl.ch)) && i < count; i++ {
newCh <- &Token{}
}
tl.count = count
tl.ch = newCh
}
}

// NewTokenLimiter creates a TokenLimiter with count tokens.
func NewTokenLimiter(count uint) *TokenLimiter {
tl := &TokenLimiter{count: count, ch: make(chan *Token, count)}
Expand Down
11 changes: 11 additions & 0 deletions sessionctx/variable/sysvar.go
Original file line number Diff line number Diff line change
Expand Up @@ -419,6 +419,17 @@ var defaultSysVars = []*SysVar{
{Scope: ScopeInstance, Name: PluginDir, Value: "/data/deploy/plugin", ReadOnly: true, GetGlobal: func(s *SessionVars) (string, error) {
return config.GetGlobalConfig().Instance.PluginDir, nil
}},
{Scope: ScopeInstance, Name: TiDBConnectionConcurrencyLimit, Value: strconv.Itoa(int(config.GetGlobalConfig().Instance.ConnectionConcurrencyLimit)), Type: TypeInt, MinValue: 1, MaxValue: config.MaxConnectionConcurrencyLimit, SetGlobal: func(s *SessionVars, val string) error {
oldVal := atomic.LoadUint32(&config.GetGlobalConfig().Instance.ConnectionConcurrencyLimit)
ival := uint32(TidbOptInt64(val, int64(config.GetGlobalConfig().Instance.ConnectionConcurrencyLimit)))
if oldVal != ival {
SetConnectionConcurrencyLimit(ival)
}
atomic.StoreUint32(&config.GetGlobalConfig().Instance.ConnectionConcurrencyLimit, ival)
return nil
}, GetGlobal: func(s *SessionVars) (string, error) {
return strconv.FormatInt(int64(atomic.LoadUint32(&config.GetGlobalConfig().Instance.ConnectionConcurrencyLimit)), 10), nil
}},

/* The system variables below have GLOBAL scope */
{Scope: ScopeGlobal, Name: MaxPreparedStmtCount, Value: strconv.FormatInt(DefMaxPreparedStmtCount, 10), Type: TypeInt, MinValue: -1, MaxValue: 1048576},
Expand Down
5 changes: 5 additions & 0 deletions sessionctx/variable/tidb_vars.go
Original file line number Diff line number Diff line change
Expand Up @@ -437,6 +437,9 @@ const (
// It can be "NO_PRIORITY", "LOW_PRIORITY", "HIGH_PRIORITY", "DELAYED"
TiDBForcePriority = "tidb_force_priority"

// TiDBConnectionConcurrencyLimit indicates the number of sessions that can execute requests concurrently.
TiDBConnectionConcurrencyLimit = "tidb_connection_concurrency_limit"

// TiDBConstraintCheckInPlace indicates to check the constraint when the SQL executing.
// It could hurt the performance of bulking insert when it is ON.
TiDBConstraintCheckInPlace = "tidb_constraint_check_in_place"
Expand Down Expand Up @@ -941,4 +944,6 @@ var (
GetMemQuotaAnalyze func() int64 = nil
// SetStatsCacheCapacity is the func registered by domain to set statsCache memory quota.
SetStatsCacheCapacity atomic.Value
// SetConnectionConcurrencyLimit is the func registered by server to set the limit of connection number.
SetConnectionConcurrencyLimit func(c uint32) = nil
)
4 changes: 3 additions & 1 deletion tidb-server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -448,7 +448,7 @@ func overrideConfig(cfg *config.Config) {
cfg.Lease = *ddlLease
}
if actualFlags[nmTokenLimit] {
cfg.TokenLimit = uint(*tokenLimit)
cfg.Instance.ConnectionConcurrencyLimit = uint32(*tokenLimit)
}
if actualFlags[nmPluginLoad] {
cfg.Instance.PluginLoad = *pluginLoad
Expand Down Expand Up @@ -558,6 +558,8 @@ func setGlobalVars() {
cfg.Instance.PluginLoad = cfg.Plugin.Load
case "plugin.dir":
cfg.Instance.PluginDir = cfg.Plugin.Dir
case "token-limit":
cfg.Instance.ConnectionConcurrencyLimit = cfg.TokenLimit
}
case "log":
switch oldName {
Expand Down