Skip to content

Commit

Permalink
Adjust the store limit according to the cluster state dynamically (#1902
Browse files Browse the repository at this point in the history
)

Adjust the store limit according to the cluster state dynamically

One of the main job of PD is to keep the data balanced on all TiKV nodes. The scheduler in PD moves regions between TiKVs when the unbalance is detected. In the case of adding or removing a TiKV node, the unbalance occurs, and the scheduler tries to move a lot amount of data. To avoid overloading a TiKV node, PD limits the speed to move regions for each TiKV node, the strategy is called store-limit.

The store-limit is a fixed value which is 15 regions per minute by default. This value is conservative, it prevents the TiKV from overloading in almost all cases. However, the value is too low for an idle cluster which may have no or little load. It takes too much time to balance the data when adding or removing a node. This PR tries to solve this problem by introducing the measurement of cluster state and adjust the store-limit dynamically according to the cluster state.

Signed-off-by: Shafreeck Sea <shafreeck@gmail.com>

Co-authored-by: pingcap-github-bot <sre-bot@pingcap.com>
Co-authored-by: ShuNing <nolouch@gmail.com>
  • Loading branch information
3 people authored Dec 30, 2019
1 parent a607642 commit 73e4f0d
Show file tree
Hide file tree
Showing 18 changed files with 494 additions and 70 deletions.
2 changes: 1 addition & 1 deletion docs/api.html

Large diffs are not rendered by default.

28 changes: 28 additions & 0 deletions server/api/api.raml
Original file line number Diff line number Diff line change
Expand Up @@ -400,6 +400,13 @@ types:
type: string
enum: [ in, notIn, exists, notExists ]
values?: string[]
StoreLimitScene:
type: object
properties:
idle: integer
low: integer
normal: integer
high: integer

/cluster/status:
description: Cluster status.
Expand Down Expand Up @@ -764,6 +771,27 @@ types:
type: Stores
500:
description: PD server failed to proceed the request.
/limit/scene:
description: Get or update the store limit for scenes
get:
description: Get the store limit for scenes
responses:
200:
body:
application/json:
type: StoreLimitScene
500:
description: PD server failed to proceed the request.
post:
description: Update the store limit for scenes
body:
application/json:
type: StoreLimitScene
responses:
200:
description: Store limit for specific scenes are updated
500:
description: PD server failed to proceed the request.

/limit:
description: The balance rate limit for all stores.
Expand Down
2 changes: 2 additions & 0 deletions server/api/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,8 @@ func createRouter(prefix string, svr *server.Server) *mux.Router {
clusterRouter.HandleFunc("/api/v1/stores/remove-tombstone", storesHandler.RemoveTombStone).Methods("DELETE")
clusterRouter.HandleFunc("/api/v1/stores/limit", storesHandler.GetAllLimit).Methods("GET")
clusterRouter.HandleFunc("/api/v1/stores/limit", storesHandler.SetAllLimit).Methods("POST")
clusterRouter.HandleFunc("/api/v1/stores/limit/scene", storesHandler.SetStoreLimitScene).Methods("POST")
clusterRouter.HandleFunc("/api/v1/stores/limit/scene", storesHandler.GetStoreLimitScene).Methods("GET")

labelsHandler := newLabelsHandler(svr, rd)
clusterRouter.HandleFunc("/api/v1/labels", labelsHandler.Get).Methods("GET")
Expand Down
17 changes: 15 additions & 2 deletions server/api/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (
"github.com/pingcap/pd/server/config"
"github.com/pingcap/pd/server/core"
"github.com/pingcap/pd/server/schedule"
"github.com/pingcap/pd/server/schedule/operator"
"github.com/pkg/errors"
"github.com/unrolled/render"
)
Expand Down Expand Up @@ -377,14 +376,28 @@ func (h *storesHandler) GetAllLimit(w http.ResponseWriter, r *http.Request) {
resp := make(map[uint64]*LimitResp)
for s, l := range limits {
resp[s] = &LimitResp{
Rate: l.Rate() / float64(operator.RegionInfluence) * schedule.StoreBalanceBaseTime,
Rate: l.Rate() * schedule.StoreBalanceBaseTime,
Mode: l.Mode().String(),
}
}

h.rd.JSON(w, http.StatusOK, resp)
}

func (h *storesHandler) SetStoreLimitScene(w http.ResponseWriter, r *http.Request) {
scene := h.Handler.GetStoreLimitScene()
if err := apiutil.ReadJSONRespondError(h.rd, w, r.Body, &scene); err != nil {
return
}
h.Handler.SetStoreLimitScene(scene)
h.rd.JSON(w, http.StatusOK, nil)
}

func (h *storesHandler) GetStoreLimitScene(w http.ResponseWriter, r *http.Request) {
scene := h.Handler.GetStoreLimitScene()
h.rd.JSON(w, http.StatusOK, scene)
}

func (h *storesHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
rc := getCluster(r.Context())
stores := rc.GetMetaStores()
Expand Down
13 changes: 13 additions & 0 deletions server/cluster/cluster.go
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ type RaftCluster struct {
opt *config.ScheduleOption
storage *core.Storage
id id.Allocator
limiter *StoreLimiter

prepareChecker *prepareChecker
changedRegions chan *core.RegionInfo
Expand Down Expand Up @@ -210,6 +211,7 @@ func (c *RaftCluster) Start(s Server) error {

c.coordinator = newCoordinator(c.ctx, cluster, s.GetHBStreams())
c.regionStats = statistics.NewRegionStatistics(c.opt)
c.limiter = NewStoreLimiter(c.coordinator.opController)
c.quit = make(chan struct{})

c.wg.Add(3)
Expand Down Expand Up @@ -393,6 +395,12 @@ func (c *RaftCluster) HandleStoreHeartbeat(stats *pdpb.StoreStats) error {
c.core.PutStore(newStore)
c.storesStats.Observe(newStore.GetID(), newStore.GetStoreStats())
c.storesStats.UpdateTotalBytesRate(c.core.GetStores)

// c.limiter is nil before "start" is called
if c.limiter != nil && c.opt.Load().StoreLimitMode == "auto" {
c.limiter.Collect(newStore.GetStoreStats())
}

return nil
}

Expand Down Expand Up @@ -1575,6 +1583,11 @@ func (c *RaftCluster) PauseOrResumeScheduler(name string, t int64) error {
return c.coordinator.pauseOrResumeScheduler(name, t)
}

// GetStoreLimiter returns the dynamic adjusting limiter
func (c *RaftCluster) GetStoreLimiter() *StoreLimiter {
return c.limiter
}

// DialClient used to dial http request.
var DialClient = &http.Client{
Timeout: clientTimeout,
Expand Down
98 changes: 63 additions & 35 deletions server/cluster_stat.go → server/cluster/cluster_stat.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,18 @@
// See the License for the specific language governing permissions and
// limitations under the License

package server
package cluster

import (
"strings"
"sync"
"time"

"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/pingcap/log"
"github.com/pingcap/pd/pkg/slice"
"github.com/pingcap/pd/server/statistics"
"go.uber.org/zap"
)

// Cluster State Statistics
Expand Down Expand Up @@ -91,29 +94,38 @@ func (s LoadState) String() string {
return "none"
}

// ThreadsCollected filters the threads to take into
// the calculation of CPU usage.
var ThreadsCollected = []string{"grpc-server-"}

// NumberOfEntries is the max number of StatEntry that preserved,
// it is the history of a store's heartbeats. The interval of store
// heartbeats from TiKV is 10s, so we can preserve 30 entries per
// store which is about 5 minutes.
const NumberOfEntries = 30

// StaleEntriesTimeout is the time before an entry is deleted as stale.
// It is about 30 entries * 10s
const StaleEntriesTimeout = 300 * time.Second

// StatEntry is an entry of store statistics
type StatEntry pdpb.StoreStats

// CPUStatEntries saves a history of store statistics
type CPUStatEntries struct {
cpu statistics.MovingAvg
// CPUEntries saves a history of store statistics
type CPUEntries struct {
cpu statistics.MovingAvg
updated time.Time
}

// NewCPUStatEntries returns the StateEntries with a fixed size
func NewCPUStatEntries(size int) *CPUStatEntries {
return &CPUStatEntries{
// NewCPUEntries returns the StateEntries with a fixed size
func NewCPUEntries(size int) *CPUEntries {
return &CPUEntries{
cpu: statistics.NewMedianFilter(size),
}
}

// Append a StatEntry, it accepts an optional threads as a filter of CPU usage
func (s *CPUStatEntries) Append(stat *StatEntry, threads ...string) bool {
func (s *CPUEntries) Append(stat *StatEntry, threads ...string) bool {
usages := stat.CpuUsages
// all gRPC fields are optional, so we must check the empty value
if usages == nil {
Expand All @@ -135,34 +147,37 @@ func (s *CPUStatEntries) Append(stat *StatEntry, threads ...string) bool {
}
if appended > 0 {
s.cpu.Add(cpu / float64(appended))
s.updated = time.Now()
return true
}
return false
}

// CPU returns the cpu usage
func (s *CPUStatEntries) CPU() float64 {
func (s *CPUEntries) CPU() float64 {
return s.cpu.Get()
}

// ClusterStatEntries saves the StatEntries for each store in the cluster
type ClusterStatEntries struct {
// StatEntries saves the StatEntries for each store in the cluster
type StatEntries struct {
m sync.RWMutex
stats map[uint64]*CPUStatEntries
stats map[uint64]*CPUEntries
size int // size of entries to keep for each store
total int64 // total of StatEntry appended
ttl time.Duration
}

// NewClusterStatEntries returns a statistics object for the cluster
func NewClusterStatEntries(size int) *ClusterStatEntries {
return &ClusterStatEntries{
stats: make(map[uint64]*CPUStatEntries),
// NewStatEntries returns a statistics object for the cluster
func NewStatEntries(size int) *StatEntries {
return &StatEntries{
stats: make(map[uint64]*CPUEntries),
size: size,
ttl: StaleEntriesTimeout,
}
}

// Append an store StatEntry
func (cst *ClusterStatEntries) Append(stat *StatEntry) {
func (cst *StatEntries) Append(stat *StatEntry) bool {
cst.m.Lock()
defer cst.m.Unlock()

Expand All @@ -172,11 +187,11 @@ func (cst *ClusterStatEntries) Append(stat *StatEntry) {
storeID := stat.StoreId
entries, ok := cst.stats[storeID]
if !ok {
entries = NewCPUStatEntries(cst.size)
entries = NewCPUEntries(cst.size)
cst.stats[storeID] = entries
}

entries.Append(stat)
return entries.Append(stat, ThreadsCollected...)
}

func contains(slice []uint64, value uint64) bool {
Expand All @@ -189,9 +204,9 @@ func contains(slice []uint64, value uint64) bool {
}

// CPU returns the cpu usage of the cluster
func (cst *ClusterStatEntries) CPU(excludes ...uint64) float64 {
cst.m.RLock()
defer cst.m.RUnlock()
func (cst *StatEntries) CPU(excludes ...uint64) float64 {
cst.m.Lock()
defer cst.m.Unlock()

// no entries have been collected
if cst.total == 0 {
Expand All @@ -203,50 +218,63 @@ func (cst *ClusterStatEntries) CPU(excludes ...uint64) float64 {
if contains(excludes, sid) {
continue
}
if time.Since(stat.updated) > cst.ttl {
delete(cst.stats, sid)
continue
}
sum += stat.CPU()
}
if len(cst.stats) == 0 {
return 0.0
}
return sum / float64(len(cst.stats))
}

// ClusterState collects information from store heartbeat
// State collects information from store heartbeat
// and caculates the load state of the cluster
type ClusterState struct {
cst *ClusterStatEntries
type State struct {
cst *StatEntries
}

// NewClusterState return the ClusterState object which collects
// NewState return the LoadState object which collects
// information from store heartbeats and gives the current state of
// the cluster
func NewClusterState() *ClusterState {
return &ClusterState{
cst: NewClusterStatEntries(NumberOfEntries),
func NewState() *State {
return &State{
cst: NewStatEntries(NumberOfEntries),
}
}

// State returns the state of the cluster, excludes is the list of store ID
// to be excluded
func (cs *ClusterState) State(excludes ...uint64) LoadState {
func (cs *State) State(excludes ...uint64) LoadState {
// Return LoadStateNone if there is not enough heartbeats
// collected.
if cs.cst.total < NumberOfEntries {
return LoadStateNone
}

// The CPU usage in fact is collected from grpc-server, so it is not the
// CPU usage for the whole TiKV process. The boundaries are empirical
// values.
// TODO we may get a more accurate state with the information of the number // of the CPU cores
cpu := cs.cst.CPU(excludes...)
log.Debug("calculated cpu", zap.Float64("usage", cpu))
clusterStateCPUGuage.Set(cpu)
switch {
case cpu == 0:
case cpu < 5:
return LoadStateIdle
case cpu > 0 && cpu < 30:
case cpu >= 5 && cpu < 10:
return LoadStateLow
case cpu >= 30 && cpu < 80:
case cpu >= 10 && cpu < 30:
return LoadStateNormal
case cpu >= 80:
case cpu >= 30:
return LoadStateHigh
}
return LoadStateNone
}

// Collect statistics from store heartbeat
func (cs *ClusterState) Collect(stat *StatEntry) {
func (cs *State) Collect(stat *StatEntry) {
cs.cst.Append(stat)
}
Loading

0 comments on commit 73e4f0d

Please sign in to comment.