Skip to content

Commit

Permalink
statistics: pd can calculate the hot degree of the buckets. (#4727)
Browse files Browse the repository at this point in the history
close #4726

Signed-off-by: bufferflies <1045931706@qq.com>
  • Loading branch information
bufferflies authored May 19, 2022
1 parent 563a93d commit 61842c3
Show file tree
Hide file tree
Showing 9 changed files with 769 additions and 7 deletions.
17 changes: 17 additions & 0 deletions pkg/keyutil/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package keyutil

import (
"bytes"
"encoding/hex"
"fmt"
)
Expand All @@ -23,3 +24,19 @@ import (
func BuildKeyRangeKey(startKey, endKey []byte) string {
return fmt.Sprintf("%s-%s", hex.EncodeToString(startKey), hex.EncodeToString(endKey))
}

// MaxKey return the bigger key for the given keys.
func MaxKey(a, b []byte) []byte {
if bytes.Compare(a, b) > 0 {
return a
}
return b
}

// MinKey returns the smaller key for the given keys.
func MinKey(a, b []byte) []byte {
if bytes.Compare(a, b) > 0 {
return b
}
return a
}
4 changes: 3 additions & 1 deletion pkg/rangetree/range_tree.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,9 @@ func (r *RangeTree) Update(item RangeItem) []RangeItem {
r.tree.Delete(old)
children := r.factory(item.GetStartKey(), item.GetEndKey(), old)
for _, child := range children {
if bytes.Compare(child.GetStartKey(), child.GetEndKey()) < 0 {
if c := bytes.Compare(child.GetStartKey(), child.GetEndKey()); c < 0 {
r.tree.ReplaceOrInsert(child)
} else if c > 0 && len(child.GetEndKey()) == 0 {
r.tree.ReplaceOrInsert(child)
}
}
Expand Down
2 changes: 2 additions & 0 deletions server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ type RaftCluster struct {
labelLevelStats *statistics.LabelStatistics
regionStats *statistics.RegionStatistics
hotStat *statistics.HotStat
hotBuckets *buckets.HotBucketCache
ruleManager *placement.RuleManager
regionLabeler *labeler.RegionLabeler
replicationMode *replication.ModeManager
Expand Down Expand Up @@ -221,6 +222,7 @@ func (c *RaftCluster) InitCluster(
c.ctx, c.cancel = context.WithCancel(c.serverCtx)
c.labelLevelStats = statistics.NewLabelStatistics()
c.hotStat = statistics.NewHotStat(c.ctx)
c.hotBuckets = buckets.NewBucketsCache(c.ctx)
c.progressManager = progress.NewManager()
c.changedRegions = make(chan *core.RegionInfo, defaultChangedRegionsLimit)
c.prevStoreLimit = make(map[uint64]map[storelimit.Type]float64)
Expand Down
9 changes: 7 additions & 2 deletions server/cluster/cluster_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/tikv/pd/pkg/logutil"
"github.com/tikv/pd/server/core"
"github.com/tikv/pd/server/schedule"
"github.com/tikv/pd/server/statistics/buckets"
"github.com/tikv/pd/server/versioninfo"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -235,6 +236,10 @@ func (c *RaftCluster) HandleBatchReportSplit(request *pdpb.ReportBatchSplitReque
}

// HandleReportBuckets processes buckets reports from client
func (c *RaftCluster) HandleReportBuckets(buckets *metapb.Buckets) error {
return c.processReportBuckets(buckets)
func (c *RaftCluster) HandleReportBuckets(b *metapb.Buckets) error {
if err := c.processReportBuckets(b); err != nil {
return err
}
c.hotBuckets.CheckAsync(buckets.NewCheckPeerTask(b))
return nil
}
180 changes: 179 additions & 1 deletion server/statistics/buckets/bucket_stat_informer.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,26 @@

package buckets

import (
"bytes"
"fmt"

"github.com/tikv/pd/pkg/btree"
"github.com/tikv/pd/pkg/keyutil"
"github.com/tikv/pd/pkg/slice"
"github.com/tikv/pd/server/core"
"github.com/tikv/pd/server/statistics"
)

var minHotThresholds = [statistics.RegionStatCount]uint64{
statistics.RegionReadBytes: 8 * 1024,
statistics.RegionReadKeys: 128,
statistics.RegionReadQuery: 128,
statistics.RegionWriteBytes: 1 * 1024,
statistics.RegionWriteKeys: 32,
statistics.RegionWriteQuery: 32,
}

// BucketStatInformer is used to get the bucket statistics.
type BucketStatInformer interface {
BucketsStats(degree int) map[uint64][]*BucketStat
Expand All @@ -26,6 +46,164 @@ type BucketStat struct {
EndKey []byte
HotDegree int
Interval uint64
// see statistics.RegionStatKind
// the order should see statistics.RegionStatKind
Loads []uint64
}

func (b *BucketStat) clone() *BucketStat {
c := &BucketStat{
RegionID: b.RegionID,
HotDegree: b.HotDegree,
Interval: b.Interval,
StartKey: b.StartKey,
EndKey: b.EndKey,
Loads: make([]uint64, len(b.Loads)),
}
copy(c.Loads, b.Loads)
return c
}

func (b *BucketStat) String() string {
return fmt.Sprintf("[region-id:%d][start-key:%s][end-key-key:%s][hot-degree:%d][Interval:%d(ms)][Loads:%v]",
b.RegionID, core.HexRegionKeyStr(b.StartKey), core.HexRegionKeyStr(b.EndKey), b.HotDegree, b.Interval, b.Loads)
}

// BucketTreeItem is the item of the bucket btree.
type BucketTreeItem struct {
regionID uint64
startKey []byte
endKey []byte
stats []*BucketStat
interval uint64
version uint64
status status
}

// GetStartKey returns the start key of the bucket tree.
func (b *BucketTreeItem) GetStartKey() []byte {
return b.startKey
}

// GetEndKey return the end key of the bucket tree item.
func (b *BucketTreeItem) GetEndKey() []byte {
return b.endKey
}

// String implements the fmt.Stringer interface.
func (b *BucketTreeItem) String() string {
return fmt.Sprintf("[region-id:%d][start-key:%s][end-key:%s]",
b.regionID, core.HexRegionKeyStr(b.startKey), core.HexRegionKeyStr(b.endKey))
}

// Less returns true if the start key is less than the other.
func (b *BucketTreeItem) Less(than btree.Item) bool {
return bytes.Compare(b.startKey, than.(*BucketTreeItem).startKey) < 0
}

// equals returns whether the key range is overlaps with the item.
func (b *BucketTreeItem) equals(origin *BucketTreeItem) bool {
if origin == nil {
return false
}
return bytes.Equal(b.startKey, origin.startKey) && bytes.Equal(b.endKey, origin.endKey)
}

// cloneBucketItemByRange returns a new item with the same key range.
// item must have some debris for the given key range
func (b *BucketTreeItem) cloneBucketItemByRange(startKey, endKey []byte) *BucketTreeItem {
item := &BucketTreeItem{
regionID: b.regionID,
startKey: startKey,
endKey: endKey,
interval: b.interval,
version: b.version,
stats: make([]*BucketStat, 0, len(b.stats)),
status: archive,
}

for _, stat := range b.stats {
// insert if the stat has debris with the key range.
left := keyutil.MaxKey(stat.StartKey, startKey)
right := keyutil.MinKey(stat.EndKey, endKey)
if len(endKey) == 0 {
right = stat.EndKey
}
if bytes.Compare(left, right) < 0 {
copy := stat.clone()
copy.StartKey = left
copy.EndKey = right
item.stats = append(item.stats, copy)
}
}
return item
}

// inherit the hot stats from the old item to the new item.
// rule1: if one cross buckets are hot , it will inherit the hottest one.
// rule2: if the cross buckets are not hot, it will inherit the coldest one.
// rule3: if some cross buckets are hot and the others are cold, it will inherit the hottest one.
func (b *BucketTreeItem) inherit(origins []*BucketTreeItem) {
if len(origins) == 0 || len(b.stats) == 0 || bytes.Compare(b.endKey, origins[0].startKey) < 0 {
return
}

newItems := b.stats
oldItems := make([]*BucketStat, 0)
for _, bucketTree := range origins {
oldItems = append(oldItems, bucketTree.stats...)
}
// given two list of closed intervals like newItems and oldItems, where items[i]=[start-key,end-key],
// and each item are disjoint and sorted order.
// It should calculate the value if some item has intersection.
for p1, p2 := 0, 0; p1 < len(newItems) && p2 < len(oldItems); {
newItem, oldItem := newItems[p1], oldItems[p2]
left := keyutil.MaxKey(newItem.StartKey, oldItem.StartKey)
right := keyutil.MinKey(newItem.EndKey, oldItem.EndKey)

// bucket should inherit the old bucket hot degree if they have some intersection.
// skip if the left is equal to the right key, such as [10 20] [20 30].
// new bucket: |10 ---- 20 |
// old bucket: | 5 ---------15|
// they has one intersection |10--15|.
if bytes.Compare(left, right) < 0 || len(right) == 0 {
oldDegree := oldItem.HotDegree
newDegree := newItem.HotDegree
// new bucket should interim old if the hot degree of the new bucket is less than zero.
if oldDegree < 0 && newDegree <= 0 && oldDegree < newDegree {
newItem.HotDegree = oldDegree
}
// if oldDegree is greater than zero and the new bucket, the new bucket should inherit the old hot degree.
if oldDegree > 0 && oldDegree > newDegree {
newItem.HotDegree = oldDegree
}
}
// move the left item to the next, old should move first if they are equal.
if bytes.Compare(newItem.EndKey, oldItem.EndKey) > 0 || len(newItem.EndKey) == 0 {
p2++
} else {
p1++
}
}
}

func (b *BucketTreeItem) calculateHotDegree() {
for _, stat := range b.stats {
// todo: qps should be considered, tikv will report this in next sprint
// the order: read [bytes keys qps] and write[bytes keys qps]
readLoads := stat.Loads[:2]
readHot := slice.AllOf(readLoads, func(i int) bool {
return readLoads[i] > minHotThresholds[i]
})
writeLoads := stat.Loads[3:5]
writeHot := slice.AllOf(writeLoads, func(i int) bool {
return writeLoads[i] > minHotThresholds[3+i]
})
hot := readHot || writeHot
if hot && stat.HotDegree < maxHotDegree {
stat.HotDegree++
}
if !hot && stat.HotDegree > minHotDegree {
stat.HotDegree--
}
}
}
Loading

0 comments on commit 61842c3

Please sign in to comment.