Skip to content

Commit

Permalink
resourcemanger: add cpu scheduler (#39886)
Browse files Browse the repository at this point in the history
close #39657
  • Loading branch information
hawkingrei authored Jan 5, 2023
1 parent 508b601 commit 4a5a447
Show file tree
Hide file tree
Showing 12 changed files with 505 additions and 3 deletions.
11 changes: 9 additions & 2 deletions resourcemanager/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -1,12 +1,19 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library")

go_library(
name = "resourcemanage",
srcs = ["rm.go"],
name = "resourcemanager",
srcs = [
"rm.go",
"schedule.go",
],
importpath = "github.com/pingcap/tidb/resourcemanager",
visibility = ["//visibility:public"],
deps = [
"//resourcemanager/scheduler",
"//resourcemanager/util",
"//util",
"//util/cpu",
"@com_github_pingcap_log//:log",
"@org_uber_go_zap//:zap",
],
)
35 changes: 35 additions & 0 deletions resourcemanager/rm.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@
package resourcemanager

import (
"time"

"github.com/pingcap/tidb/resourcemanager/scheduler"
"github.com/pingcap/tidb/resourcemanager/util"
tidbutil "github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/cpu"
)
Expand All @@ -24,24 +28,55 @@ var GlobalResourceManager = NewResourceManger()

// ResourceManager is a resource manager
type ResourceManager struct {
poolMap *util.ShardPoolMap
scheduler []scheduler.Scheduler
cpuObserver *cpu.Observer
exitCh chan struct{}
wg tidbutil.WaitGroupWrapper
}

// NewResourceManger is to create a new resource manager
func NewResourceManger() *ResourceManager {
sc := make([]scheduler.Scheduler, 0, 1)
sc = append(sc, scheduler.NewCPUScheduler())
return &ResourceManager{
cpuObserver: cpu.NewCPUObserver(),
exitCh: make(chan struct{}),
poolMap: util.NewShardPoolMap(),
scheduler: sc,
}
}

// Start is to start resource manager
func (r *ResourceManager) Start() {
r.wg.Run(r.cpuObserver.Start)
r.wg.Run(func() {
tick := time.NewTicker(100 * time.Millisecond)
defer tick.Stop()
for {
select {
case <-tick.C:
r.schedule()
case <-r.exitCh:
return
}
}
})
}

// Stop is to stop resource manager
func (r *ResourceManager) Stop() {
r.cpuObserver.Stop()
close(r.exitCh)
r.wg.Wait()
}

// Register is to register pool into resource manager
func (r *ResourceManager) Register(pool util.GorotinuePool, name string, component util.Component) error {
p := util.PoolContainer{Pool: pool, Component: component}
return r.registerPool(name, &p)
}

func (r *ResourceManager) registerPool(name string, pool *util.PoolContainer) error {
return r.poolMap.Add(name, pool)
}
69 changes: 69 additions & 0 deletions resourcemanager/schedule.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
// Copyright 2022 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package resourcemanager

import (
"time"

"github.com/pingcap/log"
"github.com/pingcap/tidb/resourcemanager/scheduler"
"github.com/pingcap/tidb/resourcemanager/util"
"go.uber.org/zap"
)

func (r *ResourceManager) schedule() {
r.poolMap.Iter(func(pool *util.PoolContainer) {
cmd := r.schedulePool(pool)
r.exec(pool, cmd)
})
}

func (r *ResourceManager) schedulePool(pool *util.PoolContainer) scheduler.Command {
for _, sch := range r.scheduler {
cmd := sch.Tune(pool.Component, pool.Pool)
switch cmd {
case scheduler.Hold:
continue
default:
return cmd
}
}
return scheduler.Hold
}

func (*ResourceManager) exec(pool *util.PoolContainer, cmd scheduler.Command) {
if cmd == scheduler.Hold {
return
}
if time.Since(pool.Pool.LastTunerTs()) > 200*time.Millisecond {
con := pool.Pool.Cap()
switch cmd {
case scheduler.Downclock:
concurrency := con - 1
log.Info("downclock goroutine pool",
zap.Int("origin concurrency", con),
zap.Int("concurrency", concurrency),
zap.String("name", pool.Pool.Name()))
pool.Pool.Tune(concurrency)
case scheduler.Overclock:
concurrency := con + 1
log.Info("overclock goroutine pool",
zap.Int("origin concurrency", con),
zap.Int("concurrency", concurrency),
zap.String("name", pool.Pool.Name()))
pool.Pool.Tune(concurrency)
}
}
}
16 changes: 16 additions & 0 deletions resourcemanager/scheduler/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library")

go_library(
name = "scheduler",
srcs = [
"cpu_scheduler.go",
"scheduler.go",
],
importpath = "github.com/pingcap/tidb/resourcemanager/scheduler",
visibility = ["//visibility:public"],
deps = [
"//resourcemanager/util",
"//util/cpu",
"@org_uber_go_atomic//:atomic",
],
)
44 changes: 44 additions & 0 deletions resourcemanager/scheduler/cpu_scheduler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
// Copyright 2022 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package scheduler

import (
"time"

"github.com/pingcap/tidb/resourcemanager/util"
"github.com/pingcap/tidb/util/cpu"
)

// CPUScheduler is a cpu scheduler
type CPUScheduler struct{}

// NewCPUScheduler is to create a new cpu scheduler
func NewCPUScheduler() *CPUScheduler {
return &CPUScheduler{}
}

// Tune is to tune the goroutine pool
func (*CPUScheduler) Tune(_ util.Component, pool util.GorotinuePool) Command {
if time.Since(pool.LastTunerTs()) < minCPUSchedulerInterval.Load() {
return Hold
}
if cpu.GetCPUUsage() < 0.5 {
return Downclock
}
if cpu.GetCPUUsage() > 0.7 {
return Overclock
}
return Hold
}
43 changes: 43 additions & 0 deletions resourcemanager/scheduler/scheduler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
// Copyright 2022 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package scheduler

import (
"time"

"github.com/pingcap/tidb/resourcemanager/util"
"go.uber.org/atomic"
)

var (
minCPUSchedulerInterval = atomic.NewDuration(time.Minute)
)

// Command is the command for scheduler
type Command int

const (
// Downclock is to reduce the number of concurrency.
Downclock Command = iota
// Hold is to hold the number of concurrency.
Hold
// Overclock is to increase the number of concurrency.
Overclock
)

// Scheduler is a scheduler interface
type Scheduler interface {
Tune(component util.Component, p util.GorotinuePool) Command
}
20 changes: 20 additions & 0 deletions resourcemanager/util/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")

go_library(
name = "util",
srcs = [
"mock_gpool.go",
"shard_pool_map.go",
"util.go",
],
importpath = "github.com/pingcap/tidb/resourcemanager/util",
visibility = ["//visibility:public"],
deps = ["@com_github_pingcap_errors//:errors"],
)

go_test(
name = "util_test",
srcs = ["shard_pool_map_test.go"],
embed = [":util"],
deps = ["@com_github_stretchr_testify//require"],
)
97 changes: 97 additions & 0 deletions resourcemanager/util/mock_gpool.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
// Copyright 2022 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package util

import "time"

// MockGPool is only for test
type MockGPool struct {
name string
}

// NewMockGPool is only for test
func NewMockGPool(name string) *MockGPool {
return &MockGPool{name: name}
}

// Release is only for test
func (*MockGPool) Release() {
panic("implement me")
}

// Tune is only for test
func (*MockGPool) Tune(_ int) {
panic("implement me")
}

// LastTunerTs is only for test
func (*MockGPool) LastTunerTs() time.Time {
panic("implement me")
}

// MaxInFlight is only for test
func (*MockGPool) MaxInFlight() int64 {
panic("implement me")
}

// InFlight is only for test
func (*MockGPool) InFlight() int64 {
panic("implement me")
}

// MinRT is only for test
func (*MockGPool) MinRT() uint64 {
panic("implement me")
}

// MaxPASS is only for test
func (*MockGPool) MaxPASS() uint64 {
panic("implement me")
}

// Cap is only for test
func (*MockGPool) Cap() int {
panic("implement me")
}

// LongRTT is to represent the baseline latency by tracking a measurement of the long term, less volatile RTT.
func (*MockGPool) LongRTT() float64 {
panic("implement me")
}

// UpdateLongRTT is only for test
func (*MockGPool) UpdateLongRTT(_ func(float64) float64) {
panic("implement me")
}

// ShortRTT is to represent the current system latency by tracking a measurement of the short time, and more volatile RTT.
func (*MockGPool) ShortRTT() uint64 {
panic("implement me")
}

// GetQueueSize is only for test
func (*MockGPool) GetQueueSize() int64 {
panic("implement me")
}

// Running is only for test
func (*MockGPool) Running() int {
panic("implement me")
}

// Name is only for test
func (m *MockGPool) Name() string {
return m.name
}
Loading

0 comments on commit 4a5a447

Please sign in to comment.