diff --git a/resourcemanager/BUILD.bazel b/resourcemanager/BUILD.bazel index 2b97ccbd25bdd..968a73b4a0f95 100644 --- a/resourcemanager/BUILD.bazel +++ b/resourcemanager/BUILD.bazel @@ -1,12 +1,19 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library") go_library( - name = "resourcemanage", - srcs = ["rm.go"], + name = "resourcemanager", + srcs = [ + "rm.go", + "schedule.go", + ], importpath = "github.com/pingcap/tidb/resourcemanager", visibility = ["//visibility:public"], deps = [ + "//resourcemanager/scheduler", + "//resourcemanager/util", "//util", "//util/cpu", + "@com_github_pingcap_log//:log", + "@org_uber_go_zap//:zap", ], ) diff --git a/resourcemanager/rm.go b/resourcemanager/rm.go index 1195176fbde95..35ac6afff960f 100644 --- a/resourcemanager/rm.go +++ b/resourcemanager/rm.go @@ -15,6 +15,10 @@ package resourcemanager import ( + "time" + + "github.com/pingcap/tidb/resourcemanager/scheduler" + "github.com/pingcap/tidb/resourcemanager/util" tidbutil "github.com/pingcap/tidb/util" "github.com/pingcap/tidb/util/cpu" ) @@ -24,24 +28,55 @@ var GlobalResourceManager = NewResourceManger() // ResourceManager is a resource manager type ResourceManager struct { + poolMap *util.ShardPoolMap + scheduler []scheduler.Scheduler cpuObserver *cpu.Observer + exitCh chan struct{} wg tidbutil.WaitGroupWrapper } // NewResourceManger is to create a new resource manager func NewResourceManger() *ResourceManager { + sc := make([]scheduler.Scheduler, 0, 1) + sc = append(sc, scheduler.NewCPUScheduler()) return &ResourceManager{ cpuObserver: cpu.NewCPUObserver(), + exitCh: make(chan struct{}), + poolMap: util.NewShardPoolMap(), + scheduler: sc, } } // Start is to start resource manager func (r *ResourceManager) Start() { r.wg.Run(r.cpuObserver.Start) + r.wg.Run(func() { + tick := time.NewTicker(100 * time.Millisecond) + defer tick.Stop() + for { + select { + case <-tick.C: + r.schedule() + case <-r.exitCh: + return + } + } + }) } // Stop is to stop resource manager func (r *ResourceManager) Stop() { r.cpuObserver.Stop() + close(r.exitCh) r.wg.Wait() } + +// Register is to register pool into resource manager +func (r *ResourceManager) Register(pool util.GorotinuePool, name string, component util.Component) error { + p := util.PoolContainer{Pool: pool, Component: component} + return r.registerPool(name, &p) +} + +func (r *ResourceManager) registerPool(name string, pool *util.PoolContainer) error { + return r.poolMap.Add(name, pool) +} diff --git a/resourcemanager/schedule.go b/resourcemanager/schedule.go new file mode 100644 index 0000000000000..41560eed5c2a4 --- /dev/null +++ b/resourcemanager/schedule.go @@ -0,0 +1,69 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package resourcemanager + +import ( + "time" + + "github.com/pingcap/log" + "github.com/pingcap/tidb/resourcemanager/scheduler" + "github.com/pingcap/tidb/resourcemanager/util" + "go.uber.org/zap" +) + +func (r *ResourceManager) schedule() { + r.poolMap.Iter(func(pool *util.PoolContainer) { + cmd := r.schedulePool(pool) + r.exec(pool, cmd) + }) +} + +func (r *ResourceManager) schedulePool(pool *util.PoolContainer) scheduler.Command { + for _, sch := range r.scheduler { + cmd := sch.Tune(pool.Component, pool.Pool) + switch cmd { + case scheduler.Hold: + continue + default: + return cmd + } + } + return scheduler.Hold +} + +func (*ResourceManager) exec(pool *util.PoolContainer, cmd scheduler.Command) { + if cmd == scheduler.Hold { + return + } + if time.Since(pool.Pool.LastTunerTs()) > 200*time.Millisecond { + con := pool.Pool.Cap() + switch cmd { + case scheduler.Downclock: + concurrency := con - 1 + log.Info("downclock goroutine pool", + zap.Int("origin concurrency", con), + zap.Int("concurrency", concurrency), + zap.String("name", pool.Pool.Name())) + pool.Pool.Tune(concurrency) + case scheduler.Overclock: + concurrency := con + 1 + log.Info("overclock goroutine pool", + zap.Int("origin concurrency", con), + zap.Int("concurrency", concurrency), + zap.String("name", pool.Pool.Name())) + pool.Pool.Tune(concurrency) + } + } +} diff --git a/resourcemanager/scheduler/BUILD.bazel b/resourcemanager/scheduler/BUILD.bazel new file mode 100644 index 0000000000000..5dc17e8412d17 --- /dev/null +++ b/resourcemanager/scheduler/BUILD.bazel @@ -0,0 +1,16 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library") + +go_library( + name = "scheduler", + srcs = [ + "cpu_scheduler.go", + "scheduler.go", + ], + importpath = "github.com/pingcap/tidb/resourcemanager/scheduler", + visibility = ["//visibility:public"], + deps = [ + "//resourcemanager/util", + "//util/cpu", + "@org_uber_go_atomic//:atomic", + ], +) diff --git a/resourcemanager/scheduler/cpu_scheduler.go b/resourcemanager/scheduler/cpu_scheduler.go new file mode 100644 index 0000000000000..7d0bdf1d31a07 --- /dev/null +++ b/resourcemanager/scheduler/cpu_scheduler.go @@ -0,0 +1,44 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package scheduler + +import ( + "time" + + "github.com/pingcap/tidb/resourcemanager/util" + "github.com/pingcap/tidb/util/cpu" +) + +// CPUScheduler is a cpu scheduler +type CPUScheduler struct{} + +// NewCPUScheduler is to create a new cpu scheduler +func NewCPUScheduler() *CPUScheduler { + return &CPUScheduler{} +} + +// Tune is to tune the goroutine pool +func (*CPUScheduler) Tune(_ util.Component, pool util.GorotinuePool) Command { + if time.Since(pool.LastTunerTs()) < minCPUSchedulerInterval.Load() { + return Hold + } + if cpu.GetCPUUsage() < 0.5 { + return Downclock + } + if cpu.GetCPUUsage() > 0.7 { + return Overclock + } + return Hold +} diff --git a/resourcemanager/scheduler/scheduler.go b/resourcemanager/scheduler/scheduler.go new file mode 100644 index 0000000000000..6cba0e18923cc --- /dev/null +++ b/resourcemanager/scheduler/scheduler.go @@ -0,0 +1,43 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package scheduler + +import ( + "time" + + "github.com/pingcap/tidb/resourcemanager/util" + "go.uber.org/atomic" +) + +var ( + minCPUSchedulerInterval = atomic.NewDuration(time.Minute) +) + +// Command is the command for scheduler +type Command int + +const ( + // Downclock is to reduce the number of concurrency. + Downclock Command = iota + // Hold is to hold the number of concurrency. + Hold + // Overclock is to increase the number of concurrency. + Overclock +) + +// Scheduler is a scheduler interface +type Scheduler interface { + Tune(component util.Component, p util.GorotinuePool) Command +} diff --git a/resourcemanager/util/BUILD.bazel b/resourcemanager/util/BUILD.bazel new file mode 100644 index 0000000000000..7688b26a93d93 --- /dev/null +++ b/resourcemanager/util/BUILD.bazel @@ -0,0 +1,20 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") + +go_library( + name = "util", + srcs = [ + "mock_gpool.go", + "shard_pool_map.go", + "util.go", + ], + importpath = "github.com/pingcap/tidb/resourcemanager/util", + visibility = ["//visibility:public"], + deps = ["@com_github_pingcap_errors//:errors"], +) + +go_test( + name = "util_test", + srcs = ["shard_pool_map_test.go"], + embed = [":util"], + deps = ["@com_github_stretchr_testify//require"], +) diff --git a/resourcemanager/util/mock_gpool.go b/resourcemanager/util/mock_gpool.go new file mode 100644 index 0000000000000..b9e66dd9afeab --- /dev/null +++ b/resourcemanager/util/mock_gpool.go @@ -0,0 +1,97 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package util + +import "time" + +// MockGPool is only for test +type MockGPool struct { + name string +} + +// NewMockGPool is only for test +func NewMockGPool(name string) *MockGPool { + return &MockGPool{name: name} +} + +// Release is only for test +func (*MockGPool) Release() { + panic("implement me") +} + +// Tune is only for test +func (*MockGPool) Tune(_ int) { + panic("implement me") +} + +// LastTunerTs is only for test +func (*MockGPool) LastTunerTs() time.Time { + panic("implement me") +} + +// MaxInFlight is only for test +func (*MockGPool) MaxInFlight() int64 { + panic("implement me") +} + +// InFlight is only for test +func (*MockGPool) InFlight() int64 { + panic("implement me") +} + +// MinRT is only for test +func (*MockGPool) MinRT() uint64 { + panic("implement me") +} + +// MaxPASS is only for test +func (*MockGPool) MaxPASS() uint64 { + panic("implement me") +} + +// Cap is only for test +func (*MockGPool) Cap() int { + panic("implement me") +} + +// LongRTT is to represent the baseline latency by tracking a measurement of the long term, less volatile RTT. +func (*MockGPool) LongRTT() float64 { + panic("implement me") +} + +// UpdateLongRTT is only for test +func (*MockGPool) UpdateLongRTT(_ func(float64) float64) { + panic("implement me") +} + +// ShortRTT is to represent the current system latency by tracking a measurement of the short time, and more volatile RTT. +func (*MockGPool) ShortRTT() uint64 { + panic("implement me") +} + +// GetQueueSize is only for test +func (*MockGPool) GetQueueSize() int64 { + panic("implement me") +} + +// Running is only for test +func (*MockGPool) Running() int { + panic("implement me") +} + +// Name is only for test +func (m *MockGPool) Name() string { + return m.name +} diff --git a/resourcemanager/util/shard_pool_map.go b/resourcemanager/util/shard_pool_map.go new file mode 100644 index 0000000000000..8819c56ed36fa --- /dev/null +++ b/resourcemanager/util/shard_pool_map.go @@ -0,0 +1,80 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package util + +import ( + "sync" + + "github.com/pingcap/errors" +) + +const shard = 8 + +func hash(key string) int { + return int(key[0]) % shard +} + +// ShardPoolMap is a map with shard +type ShardPoolMap struct { + pools [shard]poolMap +} + +// NewShardPoolMap creates a shard pool map +func NewShardPoolMap() *ShardPoolMap { + var result ShardPoolMap + for i := 0; i < shard; i++ { + result.pools[i] = newPoolMap() + } + return &result +} + +// Add adds a pool to the map +func (s *ShardPoolMap) Add(key string, pool *PoolContainer) error { + return s.pools[hash(key)].Add(key, pool) +} + +// Iter iterates the map +func (s *ShardPoolMap) Iter(fn func(pool *PoolContainer)) { + for i := 0; i < shard; i++ { + s.pools[i].Iter(fn) + } +} + +type poolMap struct { + mu sync.RWMutex + poolMap map[string]*PoolContainer +} + +func newPoolMap() poolMap { + return poolMap{poolMap: make(map[string]*PoolContainer)} +} + +func (p *poolMap) Add(key string, pool *PoolContainer) error { + p.mu.Lock() + defer p.mu.Unlock() + if _, contain := p.poolMap[key]; contain { + return errors.New("pool is already exist") + } + p.poolMap[key] = pool + return nil +} + +func (p *poolMap) Iter(fn func(pool *PoolContainer)) { + p.mu.RLock() + defer p.mu.RUnlock() + for _, pool := range p.poolMap { + fn(pool) + } +} diff --git a/resourcemanager/util/shard_pool_map_test.go b/resourcemanager/util/shard_pool_map_test.go new file mode 100644 index 0000000000000..34e0a11ca5976 --- /dev/null +++ b/resourcemanager/util/shard_pool_map_test.go @@ -0,0 +1,38 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package util + +import ( + "strconv" + "sync/atomic" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestShardPoolMap(t *testing.T) { + rc := 10 + pm := NewShardPoolMap() + for i := 0; i < rc; i++ { + id := strconv.FormatInt(int64(i), 10) + require.NoError(t, pm.Add(id, &PoolContainer{Pool: NewMockGPool(id), Component: DDL})) + } + require.Error(t, pm.Add("1", &PoolContainer{Pool: NewMockGPool("1"), Component: DDL})) + var cnt atomic.Int32 + pm.Iter(func(pool *PoolContainer) { + cnt.Add(1) + }) + require.Equal(t, rc, int(cnt.Load())) +} diff --git a/resourcemanager/util/util.go b/resourcemanager/util/util.go new file mode 100644 index 0000000000000..d5b988c344295 --- /dev/null +++ b/resourcemanager/util/util.go @@ -0,0 +1,53 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package util + +import "time" + +// GorotinuePool is a pool interface +type GorotinuePool interface { + Release() + Tune(size int) + LastTunerTs() time.Time + MaxInFlight() int64 + InFlight() int64 + MinRT() uint64 + MaxPASS() uint64 + Cap() int + // LongRTT is to represent the baseline latency by tracking a measurement of the long term, less volatile RTT. + LongRTT() float64 + UpdateLongRTT(f func(float64) float64) + // ShortRTT is to represent the current system latency by tracking a measurement of the short time, and more volatile RTT. + ShortRTT() uint64 + GetQueueSize() int64 + Running() int + Name() string +} + +// PoolContainer is a pool container +type PoolContainer struct { + Pool GorotinuePool + Component Component +} + +// Component is ID for difference component +type Component int + +const ( + // UNKNOWN is for unknown component. It is only for test + UNKNOWN Component = iota + // DDL is for ddl component + DDL +) diff --git a/tidb-server/BUILD.bazel b/tidb-server/BUILD.bazel index 361a929351642..b918b7e1ca5f8 100644 --- a/tidb-server/BUILD.bazel +++ b/tidb-server/BUILD.bazel @@ -21,7 +21,7 @@ go_library( "//planner/core", "//plugin", "//privilege/privileges", - "//resourcemanager:resourcemanage", + "//resourcemanager", "//server", "//session", "//session/txninfo",