Skip to content

Commit

Permalink
server: implement for BucketStatInformer (#4992)
Browse files Browse the repository at this point in the history
close #4991

Signed-off-by: bufferflies <1045931706@qq.com>
  • Loading branch information
bufferflies authored May 19, 2022
1 parent 61842c3 commit 9f1894a
Show file tree
Hide file tree
Showing 9 changed files with 262 additions and 3 deletions.
10 changes: 9 additions & 1 deletion pkg/mock/mockcluster/mockcluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ type Cluster struct {
ID uint64
suspectRegions map[uint64]struct{}
*config.StoreConfigManager
*buckets.HotBucketCache
ctx context.Context
}

// NewCluster creates a new Cluster
Expand All @@ -63,9 +65,11 @@ func NewCluster(ctx context.Context, opts *config.PersistOptions) *Cluster {
BasicCluster: core.NewBasicCluster(),
IDAllocator: mockid.NewIDAllocator(),
HotStat: statistics.NewHotStat(ctx),
HotBucketCache: buckets.NewBucketsCache(ctx),
PersistOptions: opts,
suspectRegions: map[uint64]struct{}{},
StoreConfigManager: config.NewTestStoreConfigManager(nil),
ctx: ctx,
}
if clus.PersistOptions.GetReplicationConfig().EnablePlacementRules {
clus.initRuleManager()
Expand Down Expand Up @@ -133,7 +137,11 @@ func (mc *Cluster) RegionReadStats() map[uint64][]*statistics.HotPeerStat {

// BucketsStats returns hot region's buckets stats.
func (mc *Cluster) BucketsStats(degree int) map[uint64][]*buckets.BucketStat {
return nil
task := buckets.NewCollectBucketStatsTask(degree)
if !mc.HotBucketCache.CheckAsync(task) {
return nil
}
return task.WaitRet(mc.ctx)
}

// RegionWriteStats returns hot region's write stats.
Expand Down
6 changes: 5 additions & 1 deletion server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -1925,7 +1925,11 @@ func (c *RaftCluster) RegionReadStats() map[uint64][]*statistics.HotPeerStat {

// BucketsStats returns hot region's buckets stats.
func (c *RaftCluster) BucketsStats(degree int) map[uint64][]*buckets.BucketStat {
return nil
task := buckets.NewCollectBucketStatsTask(degree)
if !c.hotBuckets.CheckAsync(task) {
return nil
}
return task.WaitRet(c.ctx)
}

// RegionWriteStats returns hot region's write stats.
Expand Down
1 change: 1 addition & 0 deletions server/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -779,6 +779,7 @@ func (s *GrpcServer) ReportBuckets(stream pdpb.PD_ReportBucketsServer) error {
bucketReportCounter.WithLabelValues(storeAddress, storeLabel, "report", "err").Inc()
continue
}
bucketReportInterval.WithLabelValues(storeAddress, storeLabel).Observe(float64(buckets.GetPeriodInMs() / 1000))
bucketReportLatency.WithLabelValues(storeAddress, storeLabel).Observe(time.Since(start).Seconds())
bucketReportCounter.WithLabelValues(storeAddress, storeLabel, "report", "ok").Inc()
}
Expand Down
10 changes: 10 additions & 0 deletions server/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,15 @@ var (
Buckets: prometheus.ExponentialBuckets(0.0001, 2, 29), // 0.1ms ~ 7hours
}, []string{"address", "store"})

bucketReportInterval = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "pd",
Subsystem: "server",
Name: "bucket_report_interval_seconds",
Help: "Bucketed histogram of processing time (s) of handled bucket report requests.",
Buckets: prometheus.LinearBuckets(0, 30, 20), // 1s ~ 17m
}, []string{"address", "store"})

regionHeartbeatHandleDuration = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "pd",
Expand Down Expand Up @@ -151,4 +160,5 @@ func init() {
prometheus.MustRegister(bucketReportCounter)
prometheus.MustRegister(bucketReportLatency)
prometheus.MustRegister(serviceAuditHistogram)
prometheus.MustRegister(bucketReportInterval)
}
7 changes: 7 additions & 0 deletions server/statistics/buckets/bucket_stat_informer.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,3 +207,10 @@ func (b *BucketTreeItem) calculateHotDegree() {
}
}
}

// collectBucketsMetrics collects the metrics of the hot stats.
func (b *BucketTreeItem) collectBucketsMetrics() {
for _, bucket := range b.stats {
bucketsHotDegreeHist.Observe(float64(bucket.HotDegree))
}
}
21 changes: 21 additions & 0 deletions server/statistics/buckets/hot_bucket_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package buckets
import (
"bytes"
"context"
"time"

"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/log"
Expand Down Expand Up @@ -52,6 +53,23 @@ type HotBucketCache struct {
ctx context.Context
}

// GetHotBucketStats returns the hot stats of the regions that great than degree.
func (h *HotBucketCache) GetHotBucketStats(degree int) map[uint64][]*BucketStat {
rst := make(map[uint64][]*BucketStat)
for _, item := range h.bucketsOfRegion {
stats := make([]*BucketStat, 0)
for _, b := range item.stats {
if b.HotDegree >= degree {
stats = append(stats, b)
}
}
if len(stats) > 0 {
rst[item.regionID] = stats
}
}
return rst
}

// bucketDebrisFactory returns the debris if the key range of the item is bigger than the given key range.
// start and end key: | 001------------------------200|
// the split key range: |050---150|
Expand Down Expand Up @@ -137,7 +155,9 @@ func (h *HotBucketCache) schedule() {
case <-h.ctx.Done():
return
case task := <-h.taskQueue:
start := time.Now()
task.runTask(h)
bucketsTaskDuration.WithLabelValues(task.taskType().String()).Observe(time.Since(start).Seconds())
}
}
}
Expand All @@ -156,6 +176,7 @@ func (h *HotBucketCache) checkBucketsFlow(buckets *metapb.Buckets) (newItem *Buc
}
newItem.inherit(overlaps)
newItem.calculateHotDegree()
newItem.collectBucketsMetrics()
return newItem, overlaps
}

Expand Down
39 changes: 38 additions & 1 deletion server/statistics/buckets/hot_bucket_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,24 @@
package buckets

import (
"context"

"github.com/pingcap/kvproto/pkg/metapb"
)

type flowItemTaskKind uint32

const (
checkBucketsTaskType flowItemTaskKind = iota
collectBucketStatsTaskType
)

func (kind flowItemTaskKind) String() string {
if kind == checkBucketsTaskType {
switch kind {
case checkBucketsTaskType:
return "check_buckets"
case collectBucketStatsTaskType:
return "collect_bucket_stats"
}
return "unknown"
}
Expand Down Expand Up @@ -57,3 +63,34 @@ func (t *checkBucketsTask) runTask(cache *HotBucketCache) {
newItems, overlaps := cache.checkBucketsFlow(t.Buckets)
cache.putItem(newItems, overlaps)
}

type collectBucketStatsTask struct {
minDegree int
ret chan map[uint64][]*BucketStat // RegionID ==>Buckets
}

// NewCollectBucketStatsTask creates task to collect bucket stats.
func NewCollectBucketStatsTask(minDegree int) *collectBucketStatsTask {
return &collectBucketStatsTask{
minDegree: minDegree,
ret: make(chan map[uint64][]*BucketStat, 1),
}
}

func (t *collectBucketStatsTask) taskType() flowItemTaskKind {
return collectBucketStatsTaskType
}

func (t *collectBucketStatsTask) runTask(cache *HotBucketCache) {
t.ret <- cache.GetHotBucketStats(t.minDegree)
}

// WaitRet returns the result of the task.
func (t *collectBucketStatsTask) WaitRet(ctx context.Context) map[uint64][]*BucketStat {
select {
case <-ctx.Done():
return nil
case ret := <-t.ret:
return ret
}
}
127 changes: 127 additions & 0 deletions server/statistics/buckets/hot_bucket_task_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
// Copyright 2022 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package buckets

import (
"context"
"math"
"strconv"
"time"

. "github.com/pingcap/check"
"github.com/pingcap/kvproto/pkg/metapb"
)

var _ = Suite(&testHotBucketTaskCache{})

type testHotBucketTaskCache struct {
}

func getAllBucketStats(ctx context.Context, hotCache *HotBucketCache) map[uint64][]*BucketStat {
task := NewCollectBucketStatsTask(minHotDegree)
hotCache.CheckAsync(task)
return task.WaitRet(ctx)
}

func (s *testHotBucketTaskCache) TestColdHot(c *C) {
ctx, cancelFn := context.WithCancel(context.Background())
defer cancelFn()
hotCache := NewBucketsCache(ctx)
testdata := []struct {
buckets *metapb.Buckets
isHot bool
}{{
buckets: newTestBuckets(1, 1, [][]byte{[]byte("10"), []byte("20")}, 0),
isHot: false,
}, {
buckets: newTestBuckets(2, 1, [][]byte{[]byte("20"), []byte("30")}, math.MaxUint64),
isHot: true,
}}
for _, v := range testdata {
for i := 0; i < 20; i++ {
task := NewCheckPeerTask(v.buckets)
c.Assert(hotCache.CheckAsync(task), IsTrue)
hotBuckets := getAllBucketStats(ctx, hotCache)
time.Sleep(time.Millisecond * 10)
item := hotBuckets[v.buckets.RegionId]
c.Assert(item, NotNil)
if v.isHot {
c.Assert(item[0].HotDegree, Equals, i+1)
} else {
c.Assert(item[0].HotDegree, Equals, -i-1)
}
}
}
}

func (s *testHotBucketTaskCache) TestCheckBucketsTask(c *C) {
ctx, cancelFn := context.WithCancel(context.Background())
defer cancelFn()
hotCache := NewBucketsCache(ctx)
// case1: add bucket successfully
buckets := newTestBuckets(1, 1, [][]byte{[]byte("10"), []byte("20"), []byte("30")}, 0)
task := NewCheckPeerTask(buckets)
c.Assert(hotCache.CheckAsync(task), IsTrue)
time.Sleep(time.Millisecond * 10)

hotBuckets := getAllBucketStats(ctx, hotCache)
c.Assert(hotBuckets, HasLen, 1)
item := hotBuckets[uint64(1)]
c.Assert(item, NotNil)
c.Assert(item, HasLen, 2)
c.Assert(item[0].HotDegree, Equals, -1)
c.Assert(item[1].HotDegree, Equals, -1)

// case2: add bucket successful and the hot degree should inherit from the old one.
buckets = newTestBuckets(2, 1, [][]byte{[]byte("20"), []byte("30")}, 0)
task = NewCheckPeerTask(buckets)
c.Assert(hotCache.CheckAsync(task), IsTrue)
hotBuckets = getAllBucketStats(ctx, hotCache)
time.Sleep(time.Millisecond * 10)
item = hotBuckets[uint64(2)]
c.Assert(item, HasLen, 1)
c.Assert(item[0].HotDegree, Equals, -2)

// case3:add bucket successful and the hot degree should inherit from the old one.
buckets = newTestBuckets(1, 1, [][]byte{[]byte("10"), []byte("20")}, 0)
task = NewCheckPeerTask(buckets)
c.Assert(hotCache.CheckAsync(task), IsTrue)
hotBuckets = getAllBucketStats(ctx, hotCache)
time.Sleep(time.Millisecond * 10)
item = hotBuckets[uint64(1)]
c.Assert(item, HasLen, 1)
c.Assert(item[0].HotDegree, Equals, -2)
}

func (s *testHotBucketTaskCache) TestCollectBucketStatsTask(c *C) {
ctx, cancelFn := context.WithCancel(context.Background())
defer cancelFn()
hotCache := NewBucketsCache(ctx)
// case1: add bucket successfully
for i := uint64(0); i < 10; i++ {
buckets := convertToBucketTreeItem(newTestBuckets(i, 1, [][]byte{[]byte(strconv.FormatUint(i*10, 10)),
[]byte(strconv.FormatUint((i+1)*10, 10))}, 0))
hotCache.putItem(buckets, hotCache.getBucketsByKeyRange(buckets.startKey, buckets.endKey))
}
time.Sleep(time.Millisecond * 10)
task := NewCollectBucketStatsTask(-100)
c.Assert(hotCache.CheckAsync(task), IsTrue)
stats := task.WaitRet(ctx)
c.Assert(stats, HasLen, 10)
task = NewCollectBucketStatsTask(1)
c.Assert(hotCache.CheckAsync(task), IsTrue)
stats = task.WaitRet(ctx)
c.Assert(stats, HasLen, 0)
}
44 changes: 44 additions & 0 deletions server/statistics/buckets/metric.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
// Copyright 2022 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package buckets

import (
"github.com/prometheus/client_golang/prometheus"
)

var (
bucketsHotDegreeHist = prometheus.NewHistogram(
prometheus.HistogramOpts{
Namespace: "pd",
Subsystem: "scheduler",
Name: "buckets_hot_degree_hist",
Help: "Bucketed histogram of bucket hot degree",
Buckets: prometheus.LinearBuckets(-20, 2, 20), // [-20 20]
})

bucketsTaskDuration = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "pd",
Subsystem: "scheduler",
Name: "bucket_task_duration",
Help: "Bucketed histogram of processing time (s) of bucket task.",
Buckets: prometheus.ExponentialBuckets(0.0001, 2, 23), // 0.1ms ~ 14m
}, []string{"type"})
)

func init() {
prometheus.MustRegister(bucketsHotDegreeHist)
prometheus.MustRegister(bucketsTaskDuration)
}

0 comments on commit 9f1894a

Please sign in to comment.