diff --git a/pkg/keyutil/util.go b/pkg/keyutil/util.go index ed2e07fa9c5..196369bdef2 100644 --- a/pkg/keyutil/util.go +++ b/pkg/keyutil/util.go @@ -15,6 +15,7 @@ package keyutil import ( + "bytes" "encoding/hex" "fmt" ) @@ -23,3 +24,19 @@ import ( func BuildKeyRangeKey(startKey, endKey []byte) string { return fmt.Sprintf("%s-%s", hex.EncodeToString(startKey), hex.EncodeToString(endKey)) } + +// MaxKey return the bigger key for the given keys. +func MaxKey(a, b []byte) []byte { + if bytes.Compare(a, b) > 0 { + return a + } + return b +} + +// MinKey returns the smaller key for the given keys. +func MinKey(a, b []byte) []byte { + if bytes.Compare(a, b) > 0 { + return b + } + return a +} diff --git a/pkg/rangetree/range_tree.go b/pkg/rangetree/range_tree.go index d647e1e78d0..47d7e960a0e 100644 --- a/pkg/rangetree/range_tree.go +++ b/pkg/rangetree/range_tree.go @@ -51,7 +51,9 @@ func (r *RangeTree) Update(item RangeItem) []RangeItem { r.tree.Delete(old) children := r.factory(item.GetStartKey(), item.GetEndKey(), old) for _, child := range children { - if bytes.Compare(child.GetStartKey(), child.GetEndKey()) < 0 { + if c := bytes.Compare(child.GetStartKey(), child.GetEndKey()); c < 0 { + r.tree.ReplaceOrInsert(child) + } else if c > 0 && len(child.GetEndKey()) == 0 { r.tree.ReplaceOrInsert(child) } } diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index 71242ea78fc..b7271101dcd 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -131,6 +131,7 @@ type RaftCluster struct { labelLevelStats *statistics.LabelStatistics regionStats *statistics.RegionStatistics hotStat *statistics.HotStat + hotBuckets *buckets.HotBucketCache ruleManager *placement.RuleManager regionLabeler *labeler.RegionLabeler replicationMode *replication.ModeManager @@ -221,6 +222,7 @@ func (c *RaftCluster) InitCluster( c.ctx, c.cancel = context.WithCancel(c.serverCtx) c.labelLevelStats = statistics.NewLabelStatistics() c.hotStat = statistics.NewHotStat(c.ctx) + c.hotBuckets = buckets.NewBucketsCache(c.ctx) c.progressManager = progress.NewManager() c.changedRegions = make(chan *core.RegionInfo, defaultChangedRegionsLimit) c.prevStoreLimit = make(map[uint64]map[storelimit.Type]float64) diff --git a/server/cluster/cluster_worker.go b/server/cluster/cluster_worker.go index d48c5bec060..6573c23c70e 100644 --- a/server/cluster/cluster_worker.go +++ b/server/cluster/cluster_worker.go @@ -26,6 +26,7 @@ import ( "github.com/tikv/pd/pkg/logutil" "github.com/tikv/pd/server/core" "github.com/tikv/pd/server/schedule" + "github.com/tikv/pd/server/statistics/buckets" "github.com/tikv/pd/server/versioninfo" "go.uber.org/zap" ) @@ -235,6 +236,10 @@ func (c *RaftCluster) HandleBatchReportSplit(request *pdpb.ReportBatchSplitReque } // HandleReportBuckets processes buckets reports from client -func (c *RaftCluster) HandleReportBuckets(buckets *metapb.Buckets) error { - return c.processReportBuckets(buckets) +func (c *RaftCluster) HandleReportBuckets(b *metapb.Buckets) error { + if err := c.processReportBuckets(b); err != nil { + return err + } + c.hotBuckets.CheckAsync(buckets.NewCheckPeerTask(b)) + return nil } diff --git a/server/statistics/buckets/bucket_stat_informer.go b/server/statistics/buckets/bucket_stat_informer.go index 1d95d65e50c..c64ff947917 100644 --- a/server/statistics/buckets/bucket_stat_informer.go +++ b/server/statistics/buckets/bucket_stat_informer.go @@ -14,6 +14,26 @@ package buckets +import ( + "bytes" + "fmt" + + "github.com/tikv/pd/pkg/btree" + "github.com/tikv/pd/pkg/keyutil" + "github.com/tikv/pd/pkg/slice" + "github.com/tikv/pd/server/core" + "github.com/tikv/pd/server/statistics" +) + +var minHotThresholds = [statistics.RegionStatCount]uint64{ + statistics.RegionReadBytes: 8 * 1024, + statistics.RegionReadKeys: 128, + statistics.RegionReadQuery: 128, + statistics.RegionWriteBytes: 1 * 1024, + statistics.RegionWriteKeys: 32, + statistics.RegionWriteQuery: 32, +} + // BucketStatInformer is used to get the bucket statistics. type BucketStatInformer interface { BucketsStats(degree int) map[uint64][]*BucketStat @@ -26,6 +46,164 @@ type BucketStat struct { EndKey []byte HotDegree int Interval uint64 - // see statistics.RegionStatKind + // the order should see statistics.RegionStatKind Loads []uint64 } + +func (b *BucketStat) clone() *BucketStat { + c := &BucketStat{ + RegionID: b.RegionID, + HotDegree: b.HotDegree, + Interval: b.Interval, + StartKey: b.StartKey, + EndKey: b.EndKey, + Loads: make([]uint64, len(b.Loads)), + } + copy(c.Loads, b.Loads) + return c +} + +func (b *BucketStat) String() string { + return fmt.Sprintf("[region-id:%d][start-key:%s][end-key-key:%s][hot-degree:%d][Interval:%d(ms)][Loads:%v]", + b.RegionID, core.HexRegionKeyStr(b.StartKey), core.HexRegionKeyStr(b.EndKey), b.HotDegree, b.Interval, b.Loads) +} + +// BucketTreeItem is the item of the bucket btree. +type BucketTreeItem struct { + regionID uint64 + startKey []byte + endKey []byte + stats []*BucketStat + interval uint64 + version uint64 + status status +} + +// GetStartKey returns the start key of the bucket tree. +func (b *BucketTreeItem) GetStartKey() []byte { + return b.startKey +} + +// GetEndKey return the end key of the bucket tree item. +func (b *BucketTreeItem) GetEndKey() []byte { + return b.endKey +} + +// String implements the fmt.Stringer interface. +func (b *BucketTreeItem) String() string { + return fmt.Sprintf("[region-id:%d][start-key:%s][end-key:%s]", + b.regionID, core.HexRegionKeyStr(b.startKey), core.HexRegionKeyStr(b.endKey)) +} + +// Less returns true if the start key is less than the other. +func (b *BucketTreeItem) Less(than btree.Item) bool { + return bytes.Compare(b.startKey, than.(*BucketTreeItem).startKey) < 0 +} + +// equals returns whether the key range is overlaps with the item. +func (b *BucketTreeItem) equals(origin *BucketTreeItem) bool { + if origin == nil { + return false + } + return bytes.Equal(b.startKey, origin.startKey) && bytes.Equal(b.endKey, origin.endKey) +} + +// cloneBucketItemByRange returns a new item with the same key range. +// item must have some debris for the given key range +func (b *BucketTreeItem) cloneBucketItemByRange(startKey, endKey []byte) *BucketTreeItem { + item := &BucketTreeItem{ + regionID: b.regionID, + startKey: startKey, + endKey: endKey, + interval: b.interval, + version: b.version, + stats: make([]*BucketStat, 0, len(b.stats)), + status: archive, + } + + for _, stat := range b.stats { + // insert if the stat has debris with the key range. + left := keyutil.MaxKey(stat.StartKey, startKey) + right := keyutil.MinKey(stat.EndKey, endKey) + if len(endKey) == 0 { + right = stat.EndKey + } + if bytes.Compare(left, right) < 0 { + copy := stat.clone() + copy.StartKey = left + copy.EndKey = right + item.stats = append(item.stats, copy) + } + } + return item +} + +// inherit the hot stats from the old item to the new item. +// rule1: if one cross buckets are hot , it will inherit the hottest one. +// rule2: if the cross buckets are not hot, it will inherit the coldest one. +// rule3: if some cross buckets are hot and the others are cold, it will inherit the hottest one. +func (b *BucketTreeItem) inherit(origins []*BucketTreeItem) { + if len(origins) == 0 || len(b.stats) == 0 || bytes.Compare(b.endKey, origins[0].startKey) < 0 { + return + } + + newItems := b.stats + oldItems := make([]*BucketStat, 0) + for _, bucketTree := range origins { + oldItems = append(oldItems, bucketTree.stats...) + } + // given two list of closed intervals like newItems and oldItems, where items[i]=[start-key,end-key], + // and each item are disjoint and sorted order. + // It should calculate the value if some item has intersection. + for p1, p2 := 0, 0; p1 < len(newItems) && p2 < len(oldItems); { + newItem, oldItem := newItems[p1], oldItems[p2] + left := keyutil.MaxKey(newItem.StartKey, oldItem.StartKey) + right := keyutil.MinKey(newItem.EndKey, oldItem.EndKey) + + // bucket should inherit the old bucket hot degree if they have some intersection. + // skip if the left is equal to the right key, such as [10 20] [20 30]. + // new bucket: |10 ---- 20 | + // old bucket: | 5 ---------15| + // they has one intersection |10--15|. + if bytes.Compare(left, right) < 0 || len(right) == 0 { + oldDegree := oldItem.HotDegree + newDegree := newItem.HotDegree + // new bucket should interim old if the hot degree of the new bucket is less than zero. + if oldDegree < 0 && newDegree <= 0 && oldDegree < newDegree { + newItem.HotDegree = oldDegree + } + // if oldDegree is greater than zero and the new bucket, the new bucket should inherit the old hot degree. + if oldDegree > 0 && oldDegree > newDegree { + newItem.HotDegree = oldDegree + } + } + // move the left item to the next, old should move first if they are equal. + if bytes.Compare(newItem.EndKey, oldItem.EndKey) > 0 || len(newItem.EndKey) == 0 { + p2++ + } else { + p1++ + } + } +} + +func (b *BucketTreeItem) calculateHotDegree() { + for _, stat := range b.stats { + // todo: qps should be considered, tikv will report this in next sprint + // the order: read [bytes keys qps] and write[bytes keys qps] + readLoads := stat.Loads[:2] + readHot := slice.AllOf(readLoads, func(i int) bool { + return readLoads[i] > minHotThresholds[i] + }) + writeLoads := stat.Loads[3:5] + writeHot := slice.AllOf(writeLoads, func(i int) bool { + return writeLoads[i] > minHotThresholds[3+i] + }) + hot := readHot || writeHot + if hot && stat.HotDegree < maxHotDegree { + stat.HotDegree++ + } + if !hot && stat.HotDegree > minHotDegree { + stat.HotDegree-- + } + } +} diff --git a/server/statistics/buckets/hot_bucket_cache.go b/server/statistics/buckets/hot_bucket_cache.go new file mode 100644 index 00000000000..5aba9ffdab7 --- /dev/null +++ b/server/statistics/buckets/hot_bucket_cache.go @@ -0,0 +1,222 @@ +// Copyright 2022 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package buckets + +import ( + "bytes" + "context" + + "github.com/pingcap/kvproto/pkg/metapb" + "github.com/pingcap/log" + "github.com/tikv/pd/pkg/keyutil" + "github.com/tikv/pd/pkg/logutil" + "github.com/tikv/pd/pkg/rangetree" + "go.uber.org/zap" +) + +type status int + +const ( + alive status = iota + archive +) + +const ( + // queue is the length of the channel used to send the statistics. + queue = 20000 + // bucketBtreeDegree is the degree of the btree used to store the bucket. + bucketBtreeDegree = 10 + + // the range of the hot degree should be [-100, 100] + minHotDegree = -20 + maxHotDegree = 20 +) + +// HotBucketCache is the cache of hot stats. +type HotBucketCache struct { + tree *rangetree.RangeTree // regionId -> BucketTreeItem + bucketsOfRegion map[uint64]*BucketTreeItem // regionId -> BucketTreeItem + taskQueue chan flowBucketsItemTask + ctx context.Context +} + +// bucketDebrisFactory returns the debris if the key range of the item is bigger than the given key range. +// start and end key: | 001------------------------200| +// the split key range: |050---150| +// returns debris: |001-----050| |150------200| +func bucketDebrisFactory(startKey, endKey []byte, item rangetree.RangeItem) []rangetree.RangeItem { + var res []rangetree.RangeItem + left := keyutil.MaxKey(startKey, item.GetStartKey()) + right := keyutil.MinKey(endKey, item.GetEndKey()) + if len(endKey) == 0 { + right = item.GetEndKey() + } + if len(item.GetEndKey()) == 0 { + right = endKey + } + // they have no intersection. + // key range: |001--------------100| + // bucket tree: |100-----------200| + if bytes.Compare(left, right) > 0 && len(right) != 0 { + return nil + } + + bt := item.(*BucketTreeItem) + // there will be no debris if the left is equal to the start key. + if !bytes.Equal(item.GetStartKey(), left) { + res = append(res, bt.cloneBucketItemByRange(item.GetStartKey(), left)) + } + // there will be no debris if the right is equal to the end key. + if !bytes.Equal(item.GetEndKey(), right) || len(right) == 0 { + if len(right) == 0 { + res = append(res, bt.cloneBucketItemByRange(item.GetEndKey(), right)) + } else { + res = append(res, bt.cloneBucketItemByRange(right, item.GetEndKey())) + } + } + return res +} + +// NewBucketsCache is the constructor for HotBucketCache. +func NewBucketsCache(ctx context.Context) *HotBucketCache { + bucketCache := &HotBucketCache{ + ctx: ctx, + bucketsOfRegion: make(map[uint64]*BucketTreeItem), + tree: rangetree.NewRangeTree(bucketBtreeDegree, bucketDebrisFactory), + taskQueue: make(chan flowBucketsItemTask, queue), + } + go bucketCache.schedule() + return bucketCache +} + +// putItem puts the item into the cache. +func (h *HotBucketCache) putItem(item *BucketTreeItem, overlaps []*BucketTreeItem) { + // only update origin if the key range is same. + if origin := h.bucketsOfRegion[item.regionID]; item.equals(origin) { + *origin = *item + return + } + for _, overlap := range overlaps { + if overlap.status == alive { + log.Debug("delete buckets from cache", + zap.Uint64("region-id", overlap.regionID), + logutil.ZapRedactByteString("start-key", overlap.GetStartKey()), + logutil.ZapRedactByteString("end-key", overlap.GetEndKey())) + delete(h.bucketsOfRegion, overlap.regionID) + } + } + h.bucketsOfRegion[item.regionID] = item + h.tree.Update(item) +} + +// CheckAsync returns true if the task queue is not full. +func (h *HotBucketCache) CheckAsync(task flowBucketsItemTask) bool { + select { + case h.taskQueue <- task: + return true + default: + return false + } +} + +func (h *HotBucketCache) schedule() { + for { + select { + case <-h.ctx.Done(): + return + case task := <-h.taskQueue: + task.runTask(h) + } + } +} + +// checkBucketsFlow returns the preprocessor bucket tree item and the overlaps. +// step1: convert to bucket tree item. +// step2: inherit old bucket states. +// step3: update bucket states. +func (h *HotBucketCache) checkBucketsFlow(buckets *metapb.Buckets) (newItem *BucketTreeItem, overlaps []*BucketTreeItem) { + newItem = convertToBucketTreeItem(buckets) + // origin is existed and the version is same. + if origin := h.bucketsOfRegion[buckets.GetRegionId()]; newItem.equals(origin) { + overlaps = []*BucketTreeItem{origin} + } else { + overlaps = h.getBucketsByKeyRange(newItem.startKey, newItem.endKey) + } + newItem.inherit(overlaps) + newItem.calculateHotDegree() + return newItem, overlaps +} + +// getBucketsByKeyRange returns the overlaps with the key range. +func (h *HotBucketCache) getBucketsByKeyRange(startKey, endKey []byte) (items []*BucketTreeItem) { + item := &BucketTreeItem{startKey: startKey, endKey: endKey} + ringItems := h.tree.GetOverlaps(item) + for _, item := range ringItems { + bucketItem := item.(*BucketTreeItem) + items = append(items, bucketItem) + } + return +} + +// convertToBucketTreeItem converts the bucket stat to bucket tree item. +func convertToBucketTreeItem(buckets *metapb.Buckets) *BucketTreeItem { + items := make([]*BucketStat, len(buckets.Keys)-1) + interval := buckets.PeriodInMs + // Interval may be zero after the tikv init. + if interval == 0 { + interval = 10 * 1000 + } + for i := 0; i < len(buckets.Keys)-1; i++ { + loads := []uint64{ + buckets.Stats.ReadBytes[i] * 1000 / interval, + buckets.Stats.ReadKeys[i] * 1000 / interval, + buckets.Stats.ReadQps[i] * 1000 / interval, + buckets.Stats.WriteBytes[i] * 1000 / interval, + buckets.Stats.WriteKeys[i] * 1000 / interval, + buckets.Stats.WriteQps[i] * 1000 / interval, + } + items[i] = &BucketStat{ + RegionID: buckets.RegionId, + StartKey: buckets.Keys[i], + EndKey: buckets.Keys[i+1], + HotDegree: 0, + Loads: loads, + Interval: interval, + } + } + return &BucketTreeItem{ + startKey: getStartKey(buckets), + endKey: getEndKey(buckets), + regionID: buckets.RegionId, + stats: items, + interval: interval, + version: buckets.Version, + status: alive, + } +} + +func getEndKey(buckets *metapb.Buckets) []byte { + if len(buckets.GetKeys()) == 0 { + return nil + } + return buckets.Keys[len(buckets.Keys)-1] +} + +func getStartKey(buckets *metapb.Buckets) []byte { + if len(buckets.GetKeys()) == 0 { + return nil + } + return buckets.Keys[0] +} diff --git a/server/statistics/buckets/hot_bucket_cache_test.go b/server/statistics/buckets/hot_bucket_cache_test.go new file mode 100644 index 00000000000..7c8cc85e99c --- /dev/null +++ b/server/statistics/buckets/hot_bucket_cache_test.go @@ -0,0 +1,268 @@ +// Copyright 2022 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package buckets + +import ( + "context" + "testing" + + . "github.com/pingcap/check" + "github.com/pingcap/kvproto/pkg/metapb" +) + +func Test(t *testing.T) { + TestingT(t) +} + +var _ = Suite(&testHotBucketCache{}) + +type testHotBucketCache struct{} + +func (t *testHotBucketCache) TestPutItem(c *C) { + cache := NewBucketsCache(context.Background()) + testdata := []struct { + regionID uint64 + keys [][]byte + regionCount int + treeLen int + version uint64 + }{{ + regionID: 1, + keys: [][]byte{[]byte(""), []byte("")}, + regionCount: 1, + treeLen: 1, + }, { + // case1: region split + // origin: |""-----------------------""| + // new: |10--20--30| + // tree: |""----10--20--30--------""| + regionID: 1, + keys: [][]byte{[]byte("10"), []byte("20"), []byte("30")}, + regionCount: 1, + version: 2, + treeLen: 3, + }, { + // case2: region split + // origin: |""--10-----------30---""| + // new: |15 20| + // tree: |""--10--15--20--30--""| + regionID: 2, + keys: [][]byte{[]byte("15"), []byte("20")}, + regionCount: 1, + treeLen: 5, + }, { + // case 3: region split + // origin: |""--10--15--20--30--""| + // new: |20 ---- ""| + // tree: |""--10--15--20------ ""| + regionID: 1, + keys: [][]byte{[]byte("20"), []byte("")}, + version: 3, + regionCount: 2, + treeLen: 4, + }, { + // case 4: region split + // tree: |""--10--15--20------ ""| + // new: |""----------20| + // tree: |""----------20--------""| + regionID: 3, + keys: [][]byte{[]byte(""), []byte("20")}, + regionCount: 2, + treeLen: 2, + }, { + // // case 5: region 1,2,3 will be merged. + regionID: 4, + keys: [][]byte{[]byte(""), []byte("")}, + regionCount: 1, + treeLen: 1, + }} + for _, v := range testdata { + bucket := convertToBucketTreeItem(newTestBuckets(v.regionID, v.version, v.keys, 10)) + c.Assert(bucket.GetStartKey(), BytesEquals, v.keys[0]) + c.Assert(bucket.GetEndKey(), BytesEquals, v.keys[len(v.keys)-1]) + cache.putItem(bucket, cache.getBucketsByKeyRange(bucket.GetStartKey(), bucket.GetEndKey())) + c.Assert(cache.bucketsOfRegion, HasLen, v.regionCount) + c.Assert(cache.tree.Len(), Equals, v.treeLen) + c.Assert(cache.bucketsOfRegion[v.regionID], NotNil) + c.Assert(cache.getBucketsByKeyRange([]byte("10"), nil), NotNil) + } +} + +func (t *testHotBucketCache) TestConvertToBucketTreeStat(c *C) { + buckets := &metapb.Buckets{ + RegionId: 1, + Version: 0, + Keys: [][]byte{{'1'}, {'2'}, {'3'}, {'4'}, {'5'}}, + Stats: &metapb.BucketStats{ + ReadBytes: []uint64{1, 2, 3, 4}, + ReadKeys: []uint64{1, 2, 3, 4}, + ReadQps: []uint64{1, 2, 3, 4}, + WriteBytes: []uint64{1, 2, 3, 4}, + WriteKeys: []uint64{1, 2, 3, 4}, + WriteQps: []uint64{1, 2, 3, 4}, + }, + PeriodInMs: 1000, + } + item := convertToBucketTreeItem(buckets) + c.Assert(item.startKey, BytesEquals, []byte{'1'}) + c.Assert(item.endKey, BytesEquals, []byte{'5'}) + c.Assert(item.regionID, Equals, uint64(1)) + c.Assert(item.version, Equals, uint64(0)) + c.Assert(item.stats, HasLen, 4) +} + +func (t *testHotBucketCache) TestGetBucketsByKeyRange(c *C) { + cache := NewBucketsCache(context.Background()) + bucket1 := newTestBuckets(1, 1, [][]byte{[]byte(""), []byte("015")}, 0) + bucket2 := newTestBuckets(2, 1, [][]byte{[]byte("015"), []byte("020")}, 0) + bucket3 := newTestBuckets(3, 1, [][]byte{[]byte("020"), []byte("")}, 0) + cache.putItem(cache.checkBucketsFlow(bucket1)) + cache.putItem(cache.checkBucketsFlow(bucket2)) + cache.putItem(cache.checkBucketsFlow(bucket3)) + c.Assert(cache.getBucketsByKeyRange([]byte(""), []byte("100")), HasLen, 3) + c.Assert(cache.getBucketsByKeyRange([]byte("030"), []byte("100")), HasLen, 1) + c.Assert(cache.getBucketsByKeyRange([]byte("010"), []byte("030")), HasLen, 3) + c.Assert(cache.getBucketsByKeyRange([]byte("015"), []byte("020")), HasLen, 1) + c.Assert(cache.getBucketsByKeyRange([]byte("001"), []byte("")), HasLen, 3) + c.Assert(cache.bucketsOfRegion, HasLen, 3) +} + +func (t *testHotBucketCache) TestInherit(c *C) { + originBucketItem := convertToBucketTreeItem(newTestBuckets(1, 1, [][]byte{[]byte(""), []byte("20"), []byte("50"), []byte("")}, 0)) + originBucketItem.stats[0].HotDegree = 3 + originBucketItem.stats[1].HotDegree = 2 + originBucketItem.stats[2].HotDegree = 10 + + testdata := []struct { + buckets *metapb.Buckets + expect []int + }{{ + // case1: one bucket can be inherited by many buckets. + buckets: newTestBuckets(1, 1, [][]byte{[]byte(""), []byte("20"), []byte("30"), []byte("40"), []byte("50")}, 0), + expect: []int{3, 2, 2, 2}, + }, { + // case2: the first start key is less than the end key of old item. + buckets: newTestBuckets(1, 1, [][]byte{[]byte("20"), []byte("45"), []byte("50")}, 0), + expect: []int{2, 2}, + }, { + // case3: the first start key is less than the end key of old item. + buckets: newTestBuckets(1, 1, [][]byte{[]byte("00"), []byte("05")}, 0), + expect: []int{3}, + }, { + // case4: newItem starKey is greater than old. + buckets: newTestBuckets(1, 1, [][]byte{[]byte("80"), []byte("")}, 0), + expect: []int{10}, + }, { + buckets: newTestBuckets(1, 1, [][]byte{[]byte(""), []byte("")}, 0), + expect: []int{10}, + }} + + // init: key range |10--20---50---60|(3 2 10) + for _, v := range testdata { + buckets := convertToBucketTreeItem(v.buckets) + buckets.inherit([]*BucketTreeItem{originBucketItem}) + c.Assert(buckets.stats, HasLen, len(v.expect)) + for k, v := range v.expect { + c.Assert(buckets.stats[k].HotDegree, Equals, v) + } + } +} + +func (t *testHotBucketCache) TestBucketTreeItemClone(c *C) { + // bucket range: [010,020][020,100] + origin := convertToBucketTreeItem(newTestBuckets(1, 1, [][]byte{[]byte("010"), []byte("020"), []byte("100")}, uint64(0))) + testdata := []struct { + startKey []byte + endKey []byte + count int + strict bool + }{{ + startKey: []byte("010"), + endKey: []byte("100"), + count: 2, + strict: true, + }, { + startKey: []byte("000"), + endKey: []byte("010"), + count: 0, + strict: false, + }, { + startKey: []byte("100"), + endKey: []byte("200"), + count: 0, + strict: false, + }, { + startKey: []byte("000"), + endKey: []byte("020"), + count: 1, + strict: false, + }, { + startKey: []byte("015"), + endKey: []byte("095"), + count: 2, + strict: true, + }, { + startKey: []byte("015"), + endKey: []byte("200"), + count: 2, + strict: false, + }} + for _, v := range testdata { + copy := origin.cloneBucketItemByRange(v.startKey, v.endKey) + c.Assert(copy.startKey, BytesEquals, v.startKey) + c.Assert(copy.endKey, BytesEquals, v.endKey) + c.Assert(copy.stats, HasLen, v.count) + if v.count > 0 && v.strict { + c.Assert(copy.stats[0].StartKey, BytesEquals, v.startKey) + c.Assert(copy.stats[len(copy.stats)-1].EndKey, BytesEquals, v.endKey) + } + } +} + +func (t *testHotBucketCache) TestCalculateHotDegree(c *C) { + origin := convertToBucketTreeItem(newTestBuckets(1, 1, [][]byte{[]byte("010"), []byte("100")}, uint64(0))) + origin.calculateHotDegree() + c.Assert(origin.stats[0].HotDegree, Equals, -1) + + // case1: the dimension of read will be hot + origin.stats[0].Loads = []uint64{minHotThresholds[0] + 1, minHotThresholds[1] + 1, 0, 0, 0, 0} + origin.calculateHotDegree() + c.Assert(origin.stats[0].HotDegree, Equals, 0) + + // case1: the dimension of write will be hot + origin.stats[0].Loads = []uint64{0, 0, 0, minHotThresholds[3] + 1, minHotThresholds[4] + 1, 0} + origin.calculateHotDegree() + c.Assert(origin.stats[0].HotDegree, Equals, 1) +} + +func newTestBuckets(regionID uint64, version uint64, keys [][]byte, flow uint64) *metapb.Buckets { + flows := make([]uint64, len(keys)-1) + for i := range keys { + if i == len(keys)-1 { + continue + } + flows[i] = flow + } + rst := &metapb.Buckets{RegionId: regionID, Version: version, Keys: keys, PeriodInMs: 1000, + Stats: &metapb.BucketStats{ + ReadBytes: flows, + ReadKeys: flows, + ReadQps: flows, + WriteBytes: flows, + WriteKeys: flows, + WriteQps: flows, + }} + return rst +} diff --git a/server/statistics/buckets/hot_bucket_task.go b/server/statistics/buckets/hot_bucket_task.go new file mode 100644 index 00000000000..d873dcf50be --- /dev/null +++ b/server/statistics/buckets/hot_bucket_task.go @@ -0,0 +1,59 @@ +// Copyright 2022 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package buckets + +import ( + "github.com/pingcap/kvproto/pkg/metapb" +) + +type flowItemTaskKind uint32 + +const ( + checkBucketsTaskType flowItemTaskKind = iota +) + +func (kind flowItemTaskKind) String() string { + if kind == checkBucketsTaskType { + return "check_buckets" + } + return "unknown" +} + +// flowBucketsItemTask indicates the task in flowItem queue +type flowBucketsItemTask interface { + taskType() flowItemTaskKind + runTask(cache *HotBucketCache) +} + +// checkBucketsTask indicates the task in checkBuckets queue +type checkBucketsTask struct { + Buckets *metapb.Buckets +} + +// NewCheckPeerTask creates task to update peerInfo +func NewCheckPeerTask(buckets *metapb.Buckets) flowBucketsItemTask { + return &checkBucketsTask{ + Buckets: buckets, + } +} + +func (t *checkBucketsTask) taskType() flowItemTaskKind { + return checkBucketsTaskType +} + +func (t *checkBucketsTask) runTask(cache *HotBucketCache) { + newItems, overlaps := cache.checkBucketsFlow(t.Buckets) + cache.putItem(newItems, overlaps) +} diff --git a/tests/client/client_test.go b/tests/client/client_test.go index 16e8570097d..d53518f05c7 100644 --- a/tests/client/client_test.go +++ b/tests/client/client_test.go @@ -774,9 +774,18 @@ func (s *testClientSuite) TestGetRegion(c *C) { breq := &pdpb.ReportBucketsRequest{ Header: newHeader(s.srv), Buckets: &metapb.Buckets{ - RegionId: regionID, - Version: 1, - Keys: [][]byte{[]byte("a"), []byte("z")}, + RegionId: regionID, + Version: 1, + Keys: [][]byte{[]byte("a"), []byte("z")}, + PeriodInMs: 2000, + Stats: &metapb.BucketStats{ + ReadBytes: []uint64{1}, + ReadKeys: []uint64{1}, + ReadQps: []uint64{1}, + WriteBytes: []uint64{1}, + WriteKeys: []uint64{1}, + WriteQps: []uint64{1}, + }, }, } c.Assert(s.reportBucket.Send(breq), IsNil)