From ce6bc977e8b0b5db9df4731de2aafa4cea70baf1 Mon Sep 17 00:00:00 2001 From: bufferflies <1045931706@qq.com> Date: Wed, 18 May 2022 12:10:09 +0800 Subject: [PATCH 1/6] squash Signed-off-by: bufferflies <1045931706@qq.com> --- pkg/keyutil/util.go | 15 ++ server/cluster/cluster.go | 3 + server/cluster/cluster_worker.go | 9 +- .../buckets/bucket_stat_informer.go | 207 +++++++++++++++ server/statistics/buckets/hot_bucket_cache.go | 212 +++++++++++++++ .../buckets/hot_bucket_cache_test.go | 247 ++++++++++++++++++ server/statistics/buckets/hot_bucket_task.go | 59 +++++ tests/client/client_test.go | 15 +- 8 files changed, 762 insertions(+), 5 deletions(-) create mode 100644 server/statistics/buckets/bucket_stat_informer.go create mode 100644 server/statistics/buckets/hot_bucket_cache.go create mode 100644 server/statistics/buckets/hot_bucket_cache_test.go create mode 100644 server/statistics/buckets/hot_bucket_task.go diff --git a/pkg/keyutil/util.go b/pkg/keyutil/util.go index ed2e07fa9c5..ee372ebbccf 100644 --- a/pkg/keyutil/util.go +++ b/pkg/keyutil/util.go @@ -15,6 +15,7 @@ package keyutil import ( + "bytes" "encoding/hex" "fmt" ) @@ -23,3 +24,17 @@ import ( func BuildKeyRangeKey(startKey, endKey []byte) string { return fmt.Sprintf("%s-%s", hex.EncodeToString(startKey), hex.EncodeToString(endKey)) } + +func MaxKey(a, b []byte) []byte { + if bytes.Compare(a, b) > 0 { + return a + } + return b +} + +func MinKey(a, b []byte) []byte { + if bytes.Compare(a, b) > 0 { + return b + } + return a +} diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index 888163bceaa..64f5cbcd14e 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -51,6 +51,7 @@ import ( "github.com/tikv/pd/server/schedule/placement" "github.com/tikv/pd/server/schedulers" "github.com/tikv/pd/server/statistics" + "github.com/tikv/pd/server/statistics/buckets" "github.com/tikv/pd/server/storage" "github.com/tikv/pd/server/storage/endpoint" "github.com/tikv/pd/server/versioninfo" @@ -130,6 +131,7 @@ type RaftCluster struct { labelLevelStats *statistics.LabelStatistics regionStats *statistics.RegionStatistics hotStat *statistics.HotStat + hotBuckets *buckets.HotBucketCache ruleManager *placement.RuleManager regionLabeler *labeler.RegionLabeler replicationMode *replication.ModeManager @@ -220,6 +222,7 @@ func (c *RaftCluster) InitCluster( c.ctx, c.cancel = context.WithCancel(c.serverCtx) c.labelLevelStats = statistics.NewLabelStatistics() c.hotStat = statistics.NewHotStat(c.ctx) + c.hotBuckets = buckets.NewBucketsCache(c.ctx) c.progressManager = progress.NewManager() c.changedRegions = make(chan *core.RegionInfo, defaultChangedRegionsLimit) c.prevStoreLimit = make(map[uint64]map[storelimit.Type]float64) diff --git a/server/cluster/cluster_worker.go b/server/cluster/cluster_worker.go index 33f853bce0a..47a9ae736c3 100644 --- a/server/cluster/cluster_worker.go +++ b/server/cluster/cluster_worker.go @@ -26,6 +26,7 @@ import ( "github.com/tikv/pd/pkg/logutil" "github.com/tikv/pd/server/core" "github.com/tikv/pd/server/schedule" + "github.com/tikv/pd/server/statistics/buckets" "github.com/tikv/pd/server/versioninfo" "go.uber.org/zap" ) @@ -235,6 +236,10 @@ func (c *RaftCluster) HandleBatchReportSplit(request *pdpb.ReportBatchSplitReque } // HandleReportBuckets processes buckets reports from client -func (c *RaftCluster) HandleReportBuckets(buckets *metapb.Buckets) error { - return c.processReportBuckets(buckets) +func (c *RaftCluster) HandleReportBuckets(b *metapb.Buckets) error { + if err := c.processReportBuckets(b); err != nil { + return err + } + c.hotBuckets.CheckAsync(buckets.NewCheckPeerTask(b)) + return nil } diff --git a/server/statistics/buckets/bucket_stat_informer.go b/server/statistics/buckets/bucket_stat_informer.go new file mode 100644 index 00000000000..8cc3b144636 --- /dev/null +++ b/server/statistics/buckets/bucket_stat_informer.go @@ -0,0 +1,207 @@ +// Copyright 2022 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package buckets + +import ( + "bytes" + "fmt" + "github.com/tikv/pd/pkg/keyutil" + + "github.com/tikv/pd/pkg/btree" + "github.com/tikv/pd/pkg/slice" + "github.com/tikv/pd/server/core" + "github.com/tikv/pd/server/statistics" +) + +var minHotThresholds = [statistics.RegionStatCount]uint64{ + statistics.RegionReadBytes: 8 * 1024, + statistics.RegionReadKeys: 128, + statistics.RegionReadQuery: 128, + statistics.RegionWriteBytes: 1 * 1024, + statistics.RegionWriteKeys: 32, + statistics.RegionWriteQuery: 32, +} + +// BucketStatInformer is used to get the bucket statistics. +type BucketStatInformer interface { + BucketsStats(degree int) map[uint64][]*BucketStat +} + +// BucketStat is the record the bucket statistics. +type BucketStat struct { + RegionID uint64 + StartKey []byte + EndKey []byte + HotDegree int + Interval uint64 + // see statistics.RegionStatKind + Loads []uint64 +} + +func (b *BucketStat) clone() *BucketStat { + c := &BucketStat{ + StartKey: b.StartKey, + EndKey: b.EndKey, + RegionID: b.RegionID, + HotDegree: b.HotDegree, + Interval: b.Interval, + Loads: make([]uint64, len(b.Loads)), + } + copy(c.Loads, b.Loads) + return c +} + +func (b *BucketStat) String() string { + return fmt.Sprintf("[region-id:%d][start-key:%s][end-key-key:%s][hot-degree:%d][Interval:%d(ms)][Loads:%v]", + b.RegionID, core.HexRegionKeyStr(b.StartKey), core.HexRegionKeyStr(b.EndKey), b.HotDegree, b.Interval, b.Loads) +} + +// BucketTreeItem is the item of the bucket btree. +type BucketTreeItem struct { + regionID uint64 + startKey []byte + endKey []byte + stats []*BucketStat + interval uint64 + version uint64 + status status +} + +// GetStartKey returns the start key of the bucket tree. +func (b *BucketTreeItem) GetStartKey() []byte { + return b.startKey +} + +// GetEndKey return the end key of the bucket tree item. +func (b *BucketTreeItem) GetEndKey() []byte { + return b.endKey +} + +// String implements the fmt.Stringer interface. +func (b *BucketTreeItem) String() string { + return fmt.Sprintf("[region-id:%d][start-key:%s][end-key:%s]", + b.regionID, core.HexRegionKeyStr(b.startKey), core.HexRegionKeyStr(b.endKey)) +} + +// Less returns true if the start key is less than the other. +func (b *BucketTreeItem) Less(than btree.Item) bool { + return bytes.Compare(b.startKey, than.(*BucketTreeItem).startKey) < 0 +} + +// compareKeyRange returns whether the key range is overlaps with the item. +func (b *BucketTreeItem) compareKeyRange(origin *BucketTreeItem) bool { + if origin == nil { + return false + } + // key range must be same if the version is same. + if b.version == origin.version { + return true + } + return bytes.Equal(b.startKey, origin.startKey) && bytes.Equal(b.endKey, origin.endKey) +} + +// cloneBucketItemByRange returns a new item with the same key range. +// item must have some debris for the given key range +func cloneBucketItemByRange(b *BucketTreeItem, startKey, endKey []byte) *BucketTreeItem { + item := &BucketTreeItem{ + regionID: b.regionID, + startKey: startKey, + endKey: endKey, + interval: b.interval, + version: b.version, + stats: make([]*BucketStat, 0, len(b.stats)), + status: archive, + } + + for _, stat := range b.stats { + // insert if the stat has debris with the key range. + left := keyutil.MaxKey(stat.StartKey, startKey) + right := keyutil.MinKey(stat.EndKey, endKey) + if bytes.Compare(left, right) < 0 { + copy := stat.clone() + copy.StartKey = left + copy.EndKey = right + item.stats = append(item.stats, copy) + } + } + return item +} + +// inherit the hot stats from the old item to the new item. +// rule1: if one cross buckets are hot , it will inherit the hottest one. +// rule2: if the cross buckets are not hot, it will inherit the coldest one. +// rule3: if some cross buckets are hot and the others are cold, it will inherit the hottest one. +func (b *BucketTreeItem) inherit(origins []*BucketTreeItem) { + if len(origins) == 0 || len(b.stats) == 0 || bytes.Compare(b.endKey, origins[0].startKey) < 0 { + return + } + + newItems := b.stats + oldItems := make([]*BucketStat, 0) + for _, bucketTree := range origins { + oldItems = append(oldItems, bucketTree.stats...) + } + // details: https://leetcode.cn/problems/interval-list-intersections/solution/jiu-pa-ni-bu-dong-shuang-zhi-zhen-by-hyj8/ + for p1, p2 := 0, 0; p1 < len(newItems) && p2 < len(oldItems); { + newItem, oldItem := newItems[p1], oldItems[p2] + left := keyutil.MaxKey(newItem.StartKey, oldItems[p2].StartKey) + right := keyutil.MinKey(newItem.EndKey, oldItems[p2].EndKey) + + // bucket should inherit the old bucket hot degree if they have some intersection. + // skip if the left is equal to the right key, such as [10 20] [20 30]. + // new bucket: |10 ---- 20 | + // old bucket: | 5 ---------15| + // they has one intersection |10-----15|. + if bytes.Compare(left, right) < 0 { + oldDegree := oldItems[p2].HotDegree + newDegree := newItems[p1].HotDegree + // new bucket should interim old if the hot degree of the new bucket is less than zero. + if oldDegree < 0 && newDegree <= 0 && oldDegree < newDegree { + newItem.HotDegree = oldDegree + } + // if oldDegree is greater than zero and the new bucket, the new bucket should inherit the old hot degree. + if oldDegree > 0 && oldDegree > newDegree { + newItem.HotDegree = oldDegree + } + } + // move the left item to the next, old should move first if they are equal. + if bytes.Compare(newItem.EndKey, oldItem.EndKey) > 0 { + p2++ + } else { + p1++ + } + } +} + +func (b *BucketTreeItem) calculateHotDegree() { + for _, stat := range b.stats { + // todo: qps should be considered, tikv will report this in next sprint + readLoads := stat.Loads[:2] + readHot := slice.AllOf(readLoads, func(i int) bool { + return readLoads[i] > minHotThresholds[i] + }) + writeLoads := stat.Loads[3:5] + writeHot := slice.AllOf(writeLoads, func(i int) bool { + return writeLoads[i] > minHotThresholds[3+i] + }) + hot := readHot || writeHot + if hot && stat.HotDegree < maxHotDegree { + stat.HotDegree++ + } + if !hot && stat.HotDegree > minHotDegree { + stat.HotDegree-- + } + } +} diff --git a/server/statistics/buckets/hot_bucket_cache.go b/server/statistics/buckets/hot_bucket_cache.go new file mode 100644 index 00000000000..89f5e9357c0 --- /dev/null +++ b/server/statistics/buckets/hot_bucket_cache.go @@ -0,0 +1,212 @@ +// Copyright 2022 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package buckets + +import ( + "bytes" + "context" + "github.com/tikv/pd/pkg/keyutil" + + "github.com/pingcap/kvproto/pkg/metapb" + "github.com/pingcap/log" + "github.com/tikv/pd/pkg/rangetree" + "github.com/tikv/pd/server/core" + "go.uber.org/zap" +) + +type status int + +const ( + alive status = iota + archive +) + +const ( + // queue is the length of the channel used to send the statistics. + queue = 20000 + // bucketBtreeDegree is the degree of the btree used to store the bucket. + bucketBtreeDegree = 10 + + // the range of the hot degree should be [-100, 100] + minHotDegree = -100 + maxHotDegree = 100 +) + +// HotBucketCache is the cache of hot stats. +type HotBucketCache struct { + tree *rangetree.RangeTree // regionId -> BucketTreeItem + bucketsOfRegion map[uint64]*BucketTreeItem // regionId -> BucketTreeItem + taskQueue chan flowBucketsItemTask + ctx context.Context +} + +// bucketDebrisFactory returns the debris. +// like bucket tree item: | 001------------------------200| +// the split key range: |050---150| +// returns debris: |001-----050| |150------200| +func bucketDebrisFactory(startKey, endKey []byte, item rangetree.RangeItem) []rangetree.RangeItem { + var res []rangetree.RangeItem + left := keyutil.MaxKey(startKey, item.GetStartKey()) + right := keyutil.MinKey(endKey, item.GetEndKey()) + // they have no intersection. + // key range: |001--------------100| + // bucket tree: |100-----------200| + if bytes.Compare(left, right) > 0 { + return nil + } + bt := item.(*BucketTreeItem) + // there will be no debris if the left is equal to the start key. + if !bytes.Equal(item.GetStartKey(), left) { + res = append(res, cloneBucketItemByRange(bt, item.GetStartKey(), left)) + } + + // there will be no debris if the right is equal to the end key. + if !bytes.Equal(item.GetEndKey(), right) { + res = append(res, cloneBucketItemByRange(bt, right, item.GetEndKey())) + } + return res +} + +// NewBucketsCache is the constructor for HotBucketCache. +func NewBucketsCache(ctx context.Context) *HotBucketCache { + bucketCache := &HotBucketCache{ + ctx: ctx, + bucketsOfRegion: make(map[uint64]*BucketTreeItem), + tree: rangetree.NewRangeTree(bucketBtreeDegree, bucketDebrisFactory), + taskQueue: make(chan flowBucketsItemTask, queue), + } + go bucketCache.schedule() + return bucketCache +} + +// putItem puts the item into the cache. +func (h *HotBucketCache) putItem(item *BucketTreeItem, overlaps []*BucketTreeItem) { + // only update origin if the key range is same. + if origin := h.bucketsOfRegion[item.regionID]; item.compareKeyRange(origin) { + *origin = *item + return + } + for _, overlap := range overlaps { + if overlap.status == alive { + log.Debug("delete buckets from cache", + zap.Uint64("region-id", overlap.regionID), + zap.String("start-key", core.HexRegionKeyStr(overlap.GetStartKey())), + zap.String("end-key", core.HexRegionKeyStr(overlap.GetEndKey()))) + delete(h.bucketsOfRegion, overlap.regionID) + } + } + h.bucketsOfRegion[item.regionID] = item + h.tree.Update(item) +} + +// CheckAsync returns true if the task queue is not full. +func (h *HotBucketCache) CheckAsync(task flowBucketsItemTask) bool { + select { + case h.taskQueue <- task: + return true + default: + return false + } +} + +func (h *HotBucketCache) schedule() { + for { + select { + case <-h.ctx.Done(): + return + case task := <-h.taskQueue: + task.runTask(h) + } + } +} + +// checkBucketsFlow returns the preprocessor bucket tree item and the overlaps. +// step1: convert to bucket tree item. +// step2: inherit old bucket states. +// step3: update bucket states. +func (h *HotBucketCache) checkBucketsFlow(buckets *metapb.Buckets) (newItem *BucketTreeItem, overlaps []*BucketTreeItem) { + newItem = convertToBucketTreeItem(buckets) + // origin is existed and the version is same. + if origin := h.bucketsOfRegion[buckets.GetRegionId()]; newItem.compareKeyRange(origin) { + overlaps = []*BucketTreeItem{origin} + } else { + overlaps = h.getBucketsByKeyRange(newItem.startKey, newItem.endKey) + } + newItem.inherit(overlaps) + newItem.calculateHotDegree() + return newItem, overlaps +} + +// getBucketsByKeyRange returns the overlaps with the key range. +func (h *HotBucketCache) getBucketsByKeyRange(startKey, endKey []byte) (items []*BucketTreeItem) { + item := &BucketTreeItem{startKey: startKey, endKey: endKey} + ringItems := h.tree.GetOverlaps(item) + for _, item := range ringItems { + bucketItem := item.(*BucketTreeItem) + items = append(items, bucketItem) + } + return +} + +// convertToBucketTreeItem converts the bucket stat to bucket tree item. +func convertToBucketTreeItem(buckets *metapb.Buckets) *BucketTreeItem { + items := make([]*BucketStat, len(buckets.Keys)-1) + interval := buckets.PeriodInMs + // Interval may be zero after the tikv init. + if interval == 0 { + interval = 10 * 1000 + } + for i := 0; i < len(buckets.Keys)-1; i++ { + loads := []uint64{ + buckets.Stats.ReadBytes[i] * 1000 / interval, + buckets.Stats.ReadKeys[i] * 1000 / interval, + buckets.Stats.ReadQps[i] * 1000 / interval, + buckets.Stats.WriteBytes[i] * 1000 / interval, + buckets.Stats.WriteKeys[i] * 1000 / interval, + buckets.Stats.WriteQps[i] * 1000 / interval, + } + items[i] = &BucketStat{ + RegionID: buckets.RegionId, + StartKey: buckets.Keys[i], + EndKey: buckets.Keys[i+1], + HotDegree: 0, + Loads: loads, + Interval: interval, + } + } + return &BucketTreeItem{ + startKey: getStartKey(buckets), + endKey: getEndKey(buckets), + regionID: buckets.RegionId, + stats: items, + interval: interval, + version: buckets.Version, + status: alive, + } +} + +func getEndKey(buckets *metapb.Buckets) []byte { + if len(buckets.GetKeys()) == 0 { + return nil + } + return buckets.Keys[len(buckets.Keys)-1] +} + +func getStartKey(buckets *metapb.Buckets) []byte { + if len(buckets.GetKeys()) == 0 { + return nil + } + return buckets.Keys[0] +} diff --git a/server/statistics/buckets/hot_bucket_cache_test.go b/server/statistics/buckets/hot_bucket_cache_test.go new file mode 100644 index 00000000000..60d19bee5c7 --- /dev/null +++ b/server/statistics/buckets/hot_bucket_cache_test.go @@ -0,0 +1,247 @@ +// Copyright 2022 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package buckets + +import ( + "context" + "testing" + + . "github.com/pingcap/check" + "github.com/pingcap/kvproto/pkg/metapb" +) + +func Test(t *testing.T) { + TestingT(t) +} + +var _ = Suite(&testHotBucketCache{}) + +type testHotBucketCache struct{} + +func (t *testHotBucketCache) TestPutItem(c *C) { + // case1: region split + // origin: |10|20|30| + // new: |10|15|20|30| + // when report bucket[15:20], the origin should be truncate into two region + cache := NewBucketsCache(context.Background()) + testdata := []struct { + regionID uint64 + keys [][]byte + regionCount int + treeLen int + version uint64 + }{{ + regionID: 1, + keys: [][]byte{[]byte("10"), []byte("20"), []byte("30")}, + regionCount: 1, + treeLen: 1, + }, { + regionID: 2, + keys: [][]byte{[]byte("15"), []byte("20")}, + regionCount: 1, + treeLen: 3, + }, { + regionID: 1, + keys: [][]byte{[]byte("20"), []byte("30")}, + version: 2, + regionCount: 2, + treeLen: 3, + }, { + regionID: 3, + keys: [][]byte{[]byte("10"), []byte("15")}, + regionCount: 3, + treeLen: 3, + }, { + // region 1,2,3 will be merged. + regionID: 4, + keys: [][]byte{[]byte("10"), []byte("30")}, + regionCount: 1, + treeLen: 1, + }} + for _, v := range testdata { + bucket := convertToBucketTreeItem(newTestBuckets(v.regionID, v.version, v.keys, 10)) + c.Assert(bucket.GetStartKey(), BytesEquals, v.keys[0]) + c.Assert(bucket.GetEndKey(), BytesEquals, v.keys[len(v.keys)-1]) + cache.putItem(bucket, cache.getBucketsByKeyRange(bucket.GetStartKey(), bucket.GetEndKey())) + c.Assert(cache.bucketsOfRegion, HasLen, v.regionCount) + c.Assert(cache.tree.Len(), Equals, v.treeLen) + c.Assert(cache.bucketsOfRegion[v.regionID], NotNil) + c.Assert(cache.getBucketsByKeyRange([]byte("10"), nil), NotNil) + } +} + +func (t *testHotBucketCache) TestConvertToBucketTreeStat(c *C) { + buckets := &metapb.Buckets{ + RegionId: 1, + Version: 0, + Keys: [][]byte{{'1'}, {'2'}, {'3'}, {'4'}, {'5'}}, + Stats: &metapb.BucketStats{ + ReadBytes: []uint64{1, 2, 3, 4}, + ReadKeys: []uint64{1, 2, 3, 4}, + ReadQps: []uint64{1, 2, 3, 4}, + WriteBytes: []uint64{1, 2, 3, 4}, + WriteKeys: []uint64{1, 2, 3, 4}, + WriteQps: []uint64{1, 2, 3, 4}, + }, + PeriodInMs: 1000, + } + item := convertToBucketTreeItem(buckets) + c.Assert(item.startKey, BytesEquals, []byte{'1'}) + c.Assert(item.endKey, BytesEquals, []byte{'5'}) + c.Assert(item.regionID, Equals, uint64(1)) + c.Assert(item.version, Equals, uint64(0)) + c.Assert(item.stats, HasLen, 4) +} + +func (t *testHotBucketCache) TestGetBucketsByKeyRange(c *C) { + cache := NewBucketsCache(context.Background()) + bucket1 := newTestBuckets(1, 1, [][]byte{[]byte("010"), []byte("015")}, 0) + bucket2 := newTestBuckets(2, 1, [][]byte{[]byte("015"), []byte("020")}, 0) + bucket3 := newTestBuckets(3, 1, [][]byte{[]byte("020"), []byte("030")}, 0) + cache.putItem(cache.checkBucketsFlow(bucket1)) + cache.putItem(cache.checkBucketsFlow(bucket2)) + cache.putItem(cache.checkBucketsFlow(bucket3)) + c.Assert(cache.getBucketsByKeyRange([]byte("010"), []byte("100")), NotNil) + c.Assert(cache.getBucketsByKeyRange([]byte("030"), []byte("100")), IsNil) + c.Assert(cache.getBucketsByKeyRange([]byte("010"), []byte("030")), HasLen, 3) + c.Assert(cache.getBucketsByKeyRange([]byte("010"), []byte("020")), HasLen, 2) + c.Assert(cache.getBucketsByKeyRange([]byte("001"), []byte("010")), HasLen, 0) + c.Assert(cache.bucketsOfRegion, HasLen, 3) +} + +func (t *testHotBucketCache) TestInherit(c *C) { + // init: key range |10 20|20-50|50-60|(3 2 10) + originBucketItem := convertToBucketTreeItem(newTestBuckets(1, 1, [][]byte{[]byte("10"), []byte("20"), []byte("50"), []byte("60")}, 0)) + originBucketItem.stats[0].HotDegree = 3 + originBucketItem.stats[1].HotDegree = 2 + originBucketItem.stats[2].HotDegree = 10 + + testdata := []struct { + buckets *metapb.Buckets + expect []int + }{{ + // case1: one bucket can be inherited by many buckets. + buckets: newTestBuckets(1, 1, [][]byte{[]byte("10"), []byte("20"), []byte("30"), []byte("40"), []byte("50")}, 0), + expect: []int{3, 2, 2, 2}, + }, { + // case2: the first start key is less than the end key of old item. + buckets: newTestBuckets(1, 1, [][]byte{[]byte("20"), []byte("45"), []byte("50")}, 0), + expect: []int{2, 2}, + }, { + // case3: the first start key is less than the end key of old item. + buckets: newTestBuckets(1, 1, [][]byte{[]byte("00"), []byte("05")}, 0), + expect: []int{0}, + }, { + // case4: newItem starKey is greater than old. + buckets: newTestBuckets(1, 1, [][]byte{[]byte("80"), []byte("90")}, 0), + expect: []int{0}, + }} + + for _, v := range testdata { + buckets := convertToBucketTreeItem(v.buckets) + buckets.inherit([]*BucketTreeItem{originBucketItem}) + c.Assert(buckets.stats, HasLen, len(v.expect)) + for k, v := range v.expect { + c.Assert(buckets.stats[k].HotDegree, Equals, v) + } + } +} + +func (t *testHotBucketCache) TestBucketTreeItemClone(c *C) { + // bucket range: [010,020][020,100] + origin := convertToBucketTreeItem(newTestBuckets(1, 1, [][]byte{[]byte("010"), []byte("020"), []byte("100")}, uint64(0))) + testdata := []struct { + startKey []byte + endKey []byte + count int + strict bool + }{{ + startKey: []byte("010"), + endKey: []byte("100"), + count: 2, + strict: true, + }, { + startKey: []byte("000"), + endKey: []byte("010"), + count: 0, + strict: false, + }, { + startKey: []byte("100"), + endKey: []byte("200"), + count: 0, + strict: false, + }, { + startKey: []byte("000"), + endKey: []byte("020"), + count: 1, + strict: false, + }, { + startKey: []byte("015"), + endKey: []byte("095"), + count: 2, + strict: true, + }, { + startKey: []byte("015"), + endKey: []byte("200"), + count: 2, + strict: false, + }} + for _, v := range testdata { + copy := cloneBucketItemByRange(origin, v.startKey, v.endKey) + c.Assert(copy.startKey, BytesEquals, v.startKey) + c.Assert(copy.endKey, BytesEquals, v.endKey) + c.Assert(copy.stats, HasLen, v.count) + if v.count > 0 && v.strict { + c.Assert(copy.stats[0].StartKey, BytesEquals, v.startKey) + c.Assert(copy.stats[len(copy.stats)-1].EndKey, BytesEquals, v.endKey) + } + } +} + +func (t *testHotBucketCache) TestCalculateHotDegree(c *C) { + origin := convertToBucketTreeItem(newTestBuckets(1, 1, [][]byte{[]byte("010"), []byte("100")}, uint64(0))) + origin.calculateHotDegree() + c.Assert(origin.stats[0].HotDegree, Equals, -1) + + // case1: the dimension of read will be hot + origin.stats[0].Loads = []uint64{minHotThresholds[0] + 1, minHotThresholds[1] + 1, 0, 0, 0, 0} + origin.calculateHotDegree() + c.Assert(origin.stats[0].HotDegree, Equals, 0) + + // case1: the dimension of write will be hot + origin.stats[0].Loads = []uint64{0, 0, 0, minHotThresholds[3] + 1, minHotThresholds[4] + 1, 0} + origin.calculateHotDegree() + c.Assert(origin.stats[0].HotDegree, Equals, 1) +} + +func newTestBuckets(regionID uint64, version uint64, keys [][]byte, flow uint64) *metapb.Buckets { + flows := make([]uint64, len(keys)-1) + for i := range keys { + if i == len(keys)-1 { + continue + } + flows[i] = flow + } + rst := &metapb.Buckets{RegionId: regionID, Version: version, Keys: keys, PeriodInMs: 1000, + Stats: &metapb.BucketStats{ + ReadBytes: flows, + ReadKeys: flows, + ReadQps: flows, + WriteBytes: flows, + WriteKeys: flows, + WriteQps: flows, + }} + return rst +} diff --git a/server/statistics/buckets/hot_bucket_task.go b/server/statistics/buckets/hot_bucket_task.go new file mode 100644 index 00000000000..d873dcf50be --- /dev/null +++ b/server/statistics/buckets/hot_bucket_task.go @@ -0,0 +1,59 @@ +// Copyright 2022 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package buckets + +import ( + "github.com/pingcap/kvproto/pkg/metapb" +) + +type flowItemTaskKind uint32 + +const ( + checkBucketsTaskType flowItemTaskKind = iota +) + +func (kind flowItemTaskKind) String() string { + if kind == checkBucketsTaskType { + return "check_buckets" + } + return "unknown" +} + +// flowBucketsItemTask indicates the task in flowItem queue +type flowBucketsItemTask interface { + taskType() flowItemTaskKind + runTask(cache *HotBucketCache) +} + +// checkBucketsTask indicates the task in checkBuckets queue +type checkBucketsTask struct { + Buckets *metapb.Buckets +} + +// NewCheckPeerTask creates task to update peerInfo +func NewCheckPeerTask(buckets *metapb.Buckets) flowBucketsItemTask { + return &checkBucketsTask{ + Buckets: buckets, + } +} + +func (t *checkBucketsTask) taskType() flowItemTaskKind { + return checkBucketsTaskType +} + +func (t *checkBucketsTask) runTask(cache *HotBucketCache) { + newItems, overlaps := cache.checkBucketsFlow(t.Buckets) + cache.putItem(newItems, overlaps) +} diff --git a/tests/client/client_test.go b/tests/client/client_test.go index 16e8570097d..d53518f05c7 100644 --- a/tests/client/client_test.go +++ b/tests/client/client_test.go @@ -774,9 +774,18 @@ func (s *testClientSuite) TestGetRegion(c *C) { breq := &pdpb.ReportBucketsRequest{ Header: newHeader(s.srv), Buckets: &metapb.Buckets{ - RegionId: regionID, - Version: 1, - Keys: [][]byte{[]byte("a"), []byte("z")}, + RegionId: regionID, + Version: 1, + Keys: [][]byte{[]byte("a"), []byte("z")}, + PeriodInMs: 2000, + Stats: &metapb.BucketStats{ + ReadBytes: []uint64{1}, + ReadKeys: []uint64{1}, + ReadQps: []uint64{1}, + WriteBytes: []uint64{1}, + WriteKeys: []uint64{1}, + WriteQps: []uint64{1}, + }, }, } c.Assert(s.reportBucket.Send(breq), IsNil) From a0aef3bd15596e1391fd688b4dc930ccdad6aed4 Mon Sep 17 00:00:00 2001 From: bufferflies <1045931706@qq.com> Date: Wed, 18 May 2022 14:26:33 +0800 Subject: [PATCH 2/6] address comment Signed-off-by: bufferflies <1045931706@qq.com> --- pkg/keyutil/util.go | 2 ++ .../buckets/bucket_stat_informer.go | 29 ++++++++++--------- server/statistics/buckets/hot_bucket_cache.go | 14 ++++----- 3 files changed, 25 insertions(+), 20 deletions(-) diff --git a/pkg/keyutil/util.go b/pkg/keyutil/util.go index ee372ebbccf..196369bdef2 100644 --- a/pkg/keyutil/util.go +++ b/pkg/keyutil/util.go @@ -25,6 +25,7 @@ func BuildKeyRangeKey(startKey, endKey []byte) string { return fmt.Sprintf("%s-%s", hex.EncodeToString(startKey), hex.EncodeToString(endKey)) } +// MaxKey return the bigger key for the given keys. func MaxKey(a, b []byte) []byte { if bytes.Compare(a, b) > 0 { return a @@ -32,6 +33,7 @@ func MaxKey(a, b []byte) []byte { return b } +// MinKey returns the smaller key for the given keys. func MinKey(a, b []byte) []byte { if bytes.Compare(a, b) > 0 { return b diff --git a/server/statistics/buckets/bucket_stat_informer.go b/server/statistics/buckets/bucket_stat_informer.go index 8cc3b144636..4aa0eab8fb7 100644 --- a/server/statistics/buckets/bucket_stat_informer.go +++ b/server/statistics/buckets/bucket_stat_informer.go @@ -17,9 +17,9 @@ package buckets import ( "bytes" "fmt" - "github.com/tikv/pd/pkg/keyutil" "github.com/tikv/pd/pkg/btree" + "github.com/tikv/pd/pkg/keyutil" "github.com/tikv/pd/pkg/slice" "github.com/tikv/pd/server/core" "github.com/tikv/pd/server/statistics" @@ -46,7 +46,7 @@ type BucketStat struct { EndKey []byte HotDegree int Interval uint64 - // see statistics.RegionStatKind + // the order should see statistics.RegionStatKind Loads []uint64 } @@ -100,8 +100,8 @@ func (b *BucketTreeItem) Less(than btree.Item) bool { return bytes.Compare(b.startKey, than.(*BucketTreeItem).startKey) < 0 } -// compareKeyRange returns whether the key range is overlaps with the item. -func (b *BucketTreeItem) compareKeyRange(origin *BucketTreeItem) bool { +// equals returns whether the key range is overlaps with the item. +func (b *BucketTreeItem) equals(origin *BucketTreeItem) bool { if origin == nil { return false } @@ -153,20 +153,22 @@ func (b *BucketTreeItem) inherit(origins []*BucketTreeItem) { for _, bucketTree := range origins { oldItems = append(oldItems, bucketTree.stats...) } - // details: https://leetcode.cn/problems/interval-list-intersections/solution/jiu-pa-ni-bu-dong-shuang-zhi-zhen-by-hyj8/ + // given two list of closed intervals like newItems and oldItems, where items[i]=[start-key,end-key], + // and each item are disjoint and sorted order. + // It should calculate the value if some item has intersection. for p1, p2 := 0, 0; p1 < len(newItems) && p2 < len(oldItems); { newItem, oldItem := newItems[p1], oldItems[p2] - left := keyutil.MaxKey(newItem.StartKey, oldItems[p2].StartKey) - right := keyutil.MinKey(newItem.EndKey, oldItems[p2].EndKey) + left := keyutil.MaxKey(newItem.StartKey, oldItem.StartKey) + right := keyutil.MinKey(newItem.EndKey, oldItem.EndKey) // bucket should inherit the old bucket hot degree if they have some intersection. // skip if the left is equal to the right key, such as [10 20] [20 30]. - // new bucket: |10 ---- 20 | - // old bucket: | 5 ---------15| - // they has one intersection |10-----15|. + // new bucket: |10 ---- 20 | + // old bucket: | 5 ---------15| + // they has one intersection |10--15|. if bytes.Compare(left, right) < 0 { - oldDegree := oldItems[p2].HotDegree - newDegree := newItems[p1].HotDegree + oldDegree := oldItem.HotDegree + newDegree := newItem.HotDegree // new bucket should interim old if the hot degree of the new bucket is less than zero. if oldDegree < 0 && newDegree <= 0 && oldDegree < newDegree { newItem.HotDegree = oldDegree @@ -187,7 +189,8 @@ func (b *BucketTreeItem) inherit(origins []*BucketTreeItem) { func (b *BucketTreeItem) calculateHotDegree() { for _, stat := range b.stats { - // todo: qps should be considered, tikv will report this in next sprint + // todo: qps should be considered, tikv will report this in next sprint + // the order: read [bytes keys qps] and write[bytes keys qps] readLoads := stat.Loads[:2] readHot := slice.AllOf(readLoads, func(i int) bool { return readLoads[i] > minHotThresholds[i] diff --git a/server/statistics/buckets/hot_bucket_cache.go b/server/statistics/buckets/hot_bucket_cache.go index 89f5e9357c0..09de24c6218 100644 --- a/server/statistics/buckets/hot_bucket_cache.go +++ b/server/statistics/buckets/hot_bucket_cache.go @@ -17,12 +17,12 @@ package buckets import ( "bytes" "context" - "github.com/tikv/pd/pkg/keyutil" "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/log" + "github.com/tikv/pd/pkg/keyutil" + "github.com/tikv/pd/pkg/logutil" "github.com/tikv/pd/pkg/rangetree" - "github.com/tikv/pd/server/core" "go.uber.org/zap" ) @@ -52,7 +52,7 @@ type HotBucketCache struct { ctx context.Context } -// bucketDebrisFactory returns the debris. +// bucketDebrisFactory returns the debris if the key range of the item is bigger than the given key range. // like bucket tree item: | 001------------------------200| // the split key range: |050---150| // returns debris: |001-----050| |150------200| @@ -94,7 +94,7 @@ func NewBucketsCache(ctx context.Context) *HotBucketCache { // putItem puts the item into the cache. func (h *HotBucketCache) putItem(item *BucketTreeItem, overlaps []*BucketTreeItem) { // only update origin if the key range is same. - if origin := h.bucketsOfRegion[item.regionID]; item.compareKeyRange(origin) { + if origin := h.bucketsOfRegion[item.regionID]; item.equals(origin) { *origin = *item return } @@ -102,8 +102,8 @@ func (h *HotBucketCache) putItem(item *BucketTreeItem, overlaps []*BucketTreeIte if overlap.status == alive { log.Debug("delete buckets from cache", zap.Uint64("region-id", overlap.regionID), - zap.String("start-key", core.HexRegionKeyStr(overlap.GetStartKey())), - zap.String("end-key", core.HexRegionKeyStr(overlap.GetEndKey()))) + logutil.ZapRedactByteString("start-key", overlap.GetStartKey()), + logutil.ZapRedactByteString("end-key", overlap.GetEndKey())) delete(h.bucketsOfRegion, overlap.regionID) } } @@ -139,7 +139,7 @@ func (h *HotBucketCache) schedule() { func (h *HotBucketCache) checkBucketsFlow(buckets *metapb.Buckets) (newItem *BucketTreeItem, overlaps []*BucketTreeItem) { newItem = convertToBucketTreeItem(buckets) // origin is existed and the version is same. - if origin := h.bucketsOfRegion[buckets.GetRegionId()]; newItem.compareKeyRange(origin) { + if origin := h.bucketsOfRegion[buckets.GetRegionId()]; newItem.equals(origin) { overlaps = []*BucketTreeItem{origin} } else { overlaps = h.getBucketsByKeyRange(newItem.startKey, newItem.endKey) From 63bb81cb1c8c17fbf9973121f6b0565e284e8fc6 Mon Sep 17 00:00:00 2001 From: bufferflies <1045931706@qq.com> Date: Thu, 19 May 2022 12:19:32 +0800 Subject: [PATCH 3/6] bucket-hot Signed-off-by: bufferflies <1045931706@qq.com> --- pkg/keyutil/util.go | 6 +++ pkg/rangetree/range_tree.go | 9 +++- .../buckets/bucket_stat_informer.go | 13 ++++- server/statistics/buckets/hot_bucket_cache.go | 24 +++++++-- .../buckets/hot_bucket_cache_test.go | 49 ++++++++++++------- 5 files changed, 76 insertions(+), 25 deletions(-) diff --git a/pkg/keyutil/util.go b/pkg/keyutil/util.go index 196369bdef2..8574a7c6fe0 100644 --- a/pkg/keyutil/util.go +++ b/pkg/keyutil/util.go @@ -35,6 +35,12 @@ func MaxKey(a, b []byte) []byte { // MinKey returns the smaller key for the given keys. func MinKey(a, b []byte) []byte { + if len(b) == 0 { + return a + } + if len(a) == 0 { + return b + } if bytes.Compare(a, b) > 0 { return b } diff --git a/pkg/rangetree/range_tree.go b/pkg/rangetree/range_tree.go index d647e1e78d0..17b7f0db43b 100644 --- a/pkg/rangetree/range_tree.go +++ b/pkg/rangetree/range_tree.go @@ -17,7 +17,9 @@ package rangetree import ( "bytes" + "github.com/pingcap/log" "github.com/tikv/pd/pkg/btree" + "go.uber.org/zap" ) // RangeItem is one key range tree item. @@ -51,7 +53,11 @@ func (r *RangeTree) Update(item RangeItem) []RangeItem { r.tree.Delete(old) children := r.factory(item.GetStartKey(), item.GetEndKey(), old) for _, child := range children { - if bytes.Compare(child.GetStartKey(), child.GetEndKey()) < 0 { + log.Info("Update range tree", + zap.ByteString("start-key", child.GetStartKey()), + zap.ByteString("end-key", child.GetEndKey()), + ) + if bytes.Compare(child.GetStartKey(), child.GetEndKey()) < 0 || len(child.GetEndKey()) == 0 { r.tree.ReplaceOrInsert(child) } } @@ -82,6 +88,7 @@ func (r *RangeTree) GetOverlaps(item RangeItem) []RangeItem { overlaps = append(overlaps, over) return true }) + log.Info("get over laps", zap.Int("len", len(overlaps))) return overlaps } diff --git a/server/statistics/buckets/bucket_stat_informer.go b/server/statistics/buckets/bucket_stat_informer.go index 4aa0eab8fb7..532b4c21017 100644 --- a/server/statistics/buckets/bucket_stat_informer.go +++ b/server/statistics/buckets/bucket_stat_informer.go @@ -17,6 +17,8 @@ package buckets import ( "bytes" "fmt" + "github.com/pingcap/log" + "go.uber.org/zap" "github.com/tikv/pd/pkg/btree" "github.com/tikv/pd/pkg/keyutil" @@ -129,6 +131,13 @@ func cloneBucketItemByRange(b *BucketTreeItem, startKey, endKey []byte) *BucketT // insert if the stat has debris with the key range. left := keyutil.MaxKey(stat.StartKey, startKey) right := keyutil.MinKey(stat.EndKey, endKey) + if len(endKey) == 0 { + right = stat.EndKey + } + log.Info("cloneBucketItemByRange", + zap.ByteString("start-key", left), + zap.ByteString("end-key", right), + ) if bytes.Compare(left, right) < 0 { copy := stat.clone() copy.StartKey = left @@ -166,7 +175,7 @@ func (b *BucketTreeItem) inherit(origins []*BucketTreeItem) { // new bucket: |10 ---- 20 | // old bucket: | 5 ---------15| // they has one intersection |10--15|. - if bytes.Compare(left, right) < 0 { + if bytes.Compare(left, right) < 0 || len(right) == 0 { oldDegree := oldItem.HotDegree newDegree := newItem.HotDegree // new bucket should interim old if the hot degree of the new bucket is less than zero. @@ -179,7 +188,7 @@ func (b *BucketTreeItem) inherit(origins []*BucketTreeItem) { } } // move the left item to the next, old should move first if they are equal. - if bytes.Compare(newItem.EndKey, oldItem.EndKey) > 0 { + if bytes.Compare(newItem.EndKey, oldItem.EndKey) > 0 || len(newItem.EndKey) == 0 { p2++ } else { p1++ diff --git a/server/statistics/buckets/hot_bucket_cache.go b/server/statistics/buckets/hot_bucket_cache.go index 09de24c6218..03467c348d7 100644 --- a/server/statistics/buckets/hot_bucket_cache.go +++ b/server/statistics/buckets/hot_bucket_cache.go @@ -63,19 +63,37 @@ func bucketDebrisFactory(startKey, endKey []byte, item rangetree.RangeItem) []ra // they have no intersection. // key range: |001--------------100| // bucket tree: |100-----------200| - if bytes.Compare(left, right) > 0 { + if bytes.Compare(left, right) > 0 && len(right) != 0 { return nil } + bt := item.(*BucketTreeItem) // there will be no debris if the left is equal to the start key. if !bytes.Equal(item.GetStartKey(), left) { res = append(res, cloneBucketItemByRange(bt, item.GetStartKey(), left)) } - + log.Info("debris factory", + zap.ByteString("start-key", item.GetStartKey()), + zap.ByteString("end-key", item.GetEndKey()), + zap.ByteString("new-start-key", startKey), + zap.ByteString("new-end-key", endKey), + zap.Int("len", len(res)), + ) // there will be no debris if the right is equal to the end key. if !bytes.Equal(item.GetEndKey(), right) { - res = append(res, cloneBucketItemByRange(bt, right, item.GetEndKey())) + if len(right) == 0 { + res = append(res, cloneBucketItemByRange(bt, item.GetEndKey(), right)) + } else { + res = append(res, cloneBucketItemByRange(bt, right, item.GetEndKey())) + } } + log.Info("debris factory", + zap.ByteString("start-key", item.GetStartKey()), + zap.ByteString("end-key", item.GetEndKey()), + zap.ByteString("new-start-key", startKey), + zap.ByteString("new-end-key", endKey), + zap.Int("len", len(res)), + ) return res } diff --git a/server/statistics/buckets/hot_bucket_cache_test.go b/server/statistics/buckets/hot_bucket_cache_test.go index 60d19bee5c7..5aa631ecd9d 100644 --- a/server/statistics/buckets/hot_bucket_cache_test.go +++ b/server/statistics/buckets/hot_bucket_cache_test.go @@ -16,6 +16,7 @@ package buckets import ( "context" + "fmt" "testing" . "github.com/pingcap/check" @@ -31,10 +32,6 @@ var _ = Suite(&testHotBucketCache{}) type testHotBucketCache struct{} func (t *testHotBucketCache) TestPutItem(c *C) { - // case1: region split - // origin: |10|20|30| - // new: |10|15|20|30| - // when report bucket[15:20], the origin should be truncate into two region cache := NewBucketsCache(context.Background()) testdata := []struct { regionID uint64 @@ -44,10 +41,22 @@ func (t *testHotBucketCache) TestPutItem(c *C) { version uint64 }{{ regionID: 1, - keys: [][]byte{[]byte("10"), []byte("20"), []byte("30")}, + keys: [][]byte{[]byte(""), []byte("")}, regionCount: 1, treeLen: 1, }, { + // case1: region split + // origin: |""---""| + // new: |10 20 30| + // tree: |""--10| |10--30| |30 ""| + regionID: 1, + keys: [][]byte{[]byte("10"), []byte("20"), []byte("30")}, + regionCount: 1, + version: 2, + treeLen: 3, + }, { + // case2: region split + // regionID: 2, keys: [][]byte{[]byte("15"), []byte("20")}, regionCount: 1, @@ -55,7 +64,7 @@ func (t *testHotBucketCache) TestPutItem(c *C) { }, { regionID: 1, keys: [][]byte{[]byte("20"), []byte("30")}, - version: 2, + version: 3, regionCount: 2, treeLen: 3, }, { @@ -70,7 +79,8 @@ func (t *testHotBucketCache) TestPutItem(c *C) { regionCount: 1, treeLen: 1, }} - for _, v := range testdata { + for i, v := range testdata { + fmt.Println("case:", i) bucket := convertToBucketTreeItem(newTestBuckets(v.regionID, v.version, v.keys, 10)) c.Assert(bucket.GetStartKey(), BytesEquals, v.keys[0]) c.Assert(bucket.GetEndKey(), BytesEquals, v.keys[len(v.keys)-1]) @@ -107,23 +117,23 @@ func (t *testHotBucketCache) TestConvertToBucketTreeStat(c *C) { func (t *testHotBucketCache) TestGetBucketsByKeyRange(c *C) { cache := NewBucketsCache(context.Background()) - bucket1 := newTestBuckets(1, 1, [][]byte{[]byte("010"), []byte("015")}, 0) + bucket1 := newTestBuckets(1, 1, [][]byte{[]byte(""), []byte("015")}, 0) bucket2 := newTestBuckets(2, 1, [][]byte{[]byte("015"), []byte("020")}, 0) - bucket3 := newTestBuckets(3, 1, [][]byte{[]byte("020"), []byte("030")}, 0) + bucket3 := newTestBuckets(3, 1, [][]byte{[]byte("020"), []byte("")}, 0) cache.putItem(cache.checkBucketsFlow(bucket1)) cache.putItem(cache.checkBucketsFlow(bucket2)) cache.putItem(cache.checkBucketsFlow(bucket3)) - c.Assert(cache.getBucketsByKeyRange([]byte("010"), []byte("100")), NotNil) - c.Assert(cache.getBucketsByKeyRange([]byte("030"), []byte("100")), IsNil) + c.Assert(cache.getBucketsByKeyRange([]byte(""), []byte("100")), HasLen, 3) + c.Assert(cache.getBucketsByKeyRange([]byte("030"), []byte("100")), HasLen, 1) c.Assert(cache.getBucketsByKeyRange([]byte("010"), []byte("030")), HasLen, 3) - c.Assert(cache.getBucketsByKeyRange([]byte("010"), []byte("020")), HasLen, 2) - c.Assert(cache.getBucketsByKeyRange([]byte("001"), []byte("010")), HasLen, 0) + c.Assert(cache.getBucketsByKeyRange([]byte("015"), []byte("020")), HasLen, 1) + c.Assert(cache.getBucketsByKeyRange([]byte("001"), []byte("")), HasLen, 3) c.Assert(cache.bucketsOfRegion, HasLen, 3) } func (t *testHotBucketCache) TestInherit(c *C) { // init: key range |10 20|20-50|50-60|(3 2 10) - originBucketItem := convertToBucketTreeItem(newTestBuckets(1, 1, [][]byte{[]byte("10"), []byte("20"), []byte("50"), []byte("60")}, 0)) + originBucketItem := convertToBucketTreeItem(newTestBuckets(1, 1, [][]byte{[]byte(""), []byte("20"), []byte("50"), []byte("")}, 0)) originBucketItem.stats[0].HotDegree = 3 originBucketItem.stats[1].HotDegree = 2 originBucketItem.stats[2].HotDegree = 10 @@ -133,7 +143,7 @@ func (t *testHotBucketCache) TestInherit(c *C) { expect []int }{{ // case1: one bucket can be inherited by many buckets. - buckets: newTestBuckets(1, 1, [][]byte{[]byte("10"), []byte("20"), []byte("30"), []byte("40"), []byte("50")}, 0), + buckets: newTestBuckets(1, 1, [][]byte{[]byte(""), []byte("20"), []byte("30"), []byte("40"), []byte("50")}, 0), expect: []int{3, 2, 2, 2}, }, { // case2: the first start key is less than the end key of old item. @@ -142,14 +152,15 @@ func (t *testHotBucketCache) TestInherit(c *C) { }, { // case3: the first start key is less than the end key of old item. buckets: newTestBuckets(1, 1, [][]byte{[]byte("00"), []byte("05")}, 0), - expect: []int{0}, + expect: []int{3}, }, { // case4: newItem starKey is greater than old. - buckets: newTestBuckets(1, 1, [][]byte{[]byte("80"), []byte("90")}, 0), - expect: []int{0}, + buckets: newTestBuckets(1, 1, [][]byte{[]byte("80"), []byte("")}, 0), + expect: []int{10}, }} - for _, v := range testdata { + for i, v := range testdata { + fmt.Println("case:", i) buckets := convertToBucketTreeItem(v.buckets) buckets.inherit([]*BucketTreeItem{originBucketItem}) c.Assert(buckets.stats, HasLen, len(v.expect)) From ebc9704e56c637e70198de12f5e3cbb1daf66cf8 Mon Sep 17 00:00:00 2001 From: bufferflies <1045931706@qq.com> Date: Wed, 18 May 2022 14:26:33 +0800 Subject: [PATCH 4/6] address comment Signed-off-by: bufferflies <1045931706@qq.com> --- pkg/keyutil/util.go | 2 ++ .../buckets/bucket_stat_informer.go | 29 ++++++++++--------- server/statistics/buckets/hot_bucket_cache.go | 18 ++++++------ 3 files changed, 27 insertions(+), 22 deletions(-) diff --git a/pkg/keyutil/util.go b/pkg/keyutil/util.go index ee372ebbccf..196369bdef2 100644 --- a/pkg/keyutil/util.go +++ b/pkg/keyutil/util.go @@ -25,6 +25,7 @@ func BuildKeyRangeKey(startKey, endKey []byte) string { return fmt.Sprintf("%s-%s", hex.EncodeToString(startKey), hex.EncodeToString(endKey)) } +// MaxKey return the bigger key for the given keys. func MaxKey(a, b []byte) []byte { if bytes.Compare(a, b) > 0 { return a @@ -32,6 +33,7 @@ func MaxKey(a, b []byte) []byte { return b } +// MinKey returns the smaller key for the given keys. func MinKey(a, b []byte) []byte { if bytes.Compare(a, b) > 0 { return b diff --git a/server/statistics/buckets/bucket_stat_informer.go b/server/statistics/buckets/bucket_stat_informer.go index 8cc3b144636..4aa0eab8fb7 100644 --- a/server/statistics/buckets/bucket_stat_informer.go +++ b/server/statistics/buckets/bucket_stat_informer.go @@ -17,9 +17,9 @@ package buckets import ( "bytes" "fmt" - "github.com/tikv/pd/pkg/keyutil" "github.com/tikv/pd/pkg/btree" + "github.com/tikv/pd/pkg/keyutil" "github.com/tikv/pd/pkg/slice" "github.com/tikv/pd/server/core" "github.com/tikv/pd/server/statistics" @@ -46,7 +46,7 @@ type BucketStat struct { EndKey []byte HotDegree int Interval uint64 - // see statistics.RegionStatKind + // the order should see statistics.RegionStatKind Loads []uint64 } @@ -100,8 +100,8 @@ func (b *BucketTreeItem) Less(than btree.Item) bool { return bytes.Compare(b.startKey, than.(*BucketTreeItem).startKey) < 0 } -// compareKeyRange returns whether the key range is overlaps with the item. -func (b *BucketTreeItem) compareKeyRange(origin *BucketTreeItem) bool { +// equals returns whether the key range is overlaps with the item. +func (b *BucketTreeItem) equals(origin *BucketTreeItem) bool { if origin == nil { return false } @@ -153,20 +153,22 @@ func (b *BucketTreeItem) inherit(origins []*BucketTreeItem) { for _, bucketTree := range origins { oldItems = append(oldItems, bucketTree.stats...) } - // details: https://leetcode.cn/problems/interval-list-intersections/solution/jiu-pa-ni-bu-dong-shuang-zhi-zhen-by-hyj8/ + // given two list of closed intervals like newItems and oldItems, where items[i]=[start-key,end-key], + // and each item are disjoint and sorted order. + // It should calculate the value if some item has intersection. for p1, p2 := 0, 0; p1 < len(newItems) && p2 < len(oldItems); { newItem, oldItem := newItems[p1], oldItems[p2] - left := keyutil.MaxKey(newItem.StartKey, oldItems[p2].StartKey) - right := keyutil.MinKey(newItem.EndKey, oldItems[p2].EndKey) + left := keyutil.MaxKey(newItem.StartKey, oldItem.StartKey) + right := keyutil.MinKey(newItem.EndKey, oldItem.EndKey) // bucket should inherit the old bucket hot degree if they have some intersection. // skip if the left is equal to the right key, such as [10 20] [20 30]. - // new bucket: |10 ---- 20 | - // old bucket: | 5 ---------15| - // they has one intersection |10-----15|. + // new bucket: |10 ---- 20 | + // old bucket: | 5 ---------15| + // they has one intersection |10--15|. if bytes.Compare(left, right) < 0 { - oldDegree := oldItems[p2].HotDegree - newDegree := newItems[p1].HotDegree + oldDegree := oldItem.HotDegree + newDegree := newItem.HotDegree // new bucket should interim old if the hot degree of the new bucket is less than zero. if oldDegree < 0 && newDegree <= 0 && oldDegree < newDegree { newItem.HotDegree = oldDegree @@ -187,7 +189,8 @@ func (b *BucketTreeItem) inherit(origins []*BucketTreeItem) { func (b *BucketTreeItem) calculateHotDegree() { for _, stat := range b.stats { - // todo: qps should be considered, tikv will report this in next sprint + // todo: qps should be considered, tikv will report this in next sprint + // the order: read [bytes keys qps] and write[bytes keys qps] readLoads := stat.Loads[:2] readHot := slice.AllOf(readLoads, func(i int) bool { return readLoads[i] > minHotThresholds[i] diff --git a/server/statistics/buckets/hot_bucket_cache.go b/server/statistics/buckets/hot_bucket_cache.go index 89f5e9357c0..e76e519c6ad 100644 --- a/server/statistics/buckets/hot_bucket_cache.go +++ b/server/statistics/buckets/hot_bucket_cache.go @@ -17,12 +17,12 @@ package buckets import ( "bytes" "context" - "github.com/tikv/pd/pkg/keyutil" "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/log" + "github.com/tikv/pd/pkg/keyutil" + "github.com/tikv/pd/pkg/logutil" "github.com/tikv/pd/pkg/rangetree" - "github.com/tikv/pd/server/core" "go.uber.org/zap" ) @@ -40,8 +40,8 @@ const ( bucketBtreeDegree = 10 // the range of the hot degree should be [-100, 100] - minHotDegree = -100 - maxHotDegree = 100 + minHotDegree = -20 + maxHotDegree = 20 ) // HotBucketCache is the cache of hot stats. @@ -52,7 +52,7 @@ type HotBucketCache struct { ctx context.Context } -// bucketDebrisFactory returns the debris. +// bucketDebrisFactory returns the debris if the key range of the item is bigger than the given key range. // like bucket tree item: | 001------------------------200| // the split key range: |050---150| // returns debris: |001-----050| |150------200| @@ -94,7 +94,7 @@ func NewBucketsCache(ctx context.Context) *HotBucketCache { // putItem puts the item into the cache. func (h *HotBucketCache) putItem(item *BucketTreeItem, overlaps []*BucketTreeItem) { // only update origin if the key range is same. - if origin := h.bucketsOfRegion[item.regionID]; item.compareKeyRange(origin) { + if origin := h.bucketsOfRegion[item.regionID]; item.equals(origin) { *origin = *item return } @@ -102,8 +102,8 @@ func (h *HotBucketCache) putItem(item *BucketTreeItem, overlaps []*BucketTreeIte if overlap.status == alive { log.Debug("delete buckets from cache", zap.Uint64("region-id", overlap.regionID), - zap.String("start-key", core.HexRegionKeyStr(overlap.GetStartKey())), - zap.String("end-key", core.HexRegionKeyStr(overlap.GetEndKey()))) + logutil.ZapRedactByteString("start-key", overlap.GetStartKey()), + logutil.ZapRedactByteString("end-key", overlap.GetEndKey())) delete(h.bucketsOfRegion, overlap.regionID) } } @@ -139,7 +139,7 @@ func (h *HotBucketCache) schedule() { func (h *HotBucketCache) checkBucketsFlow(buckets *metapb.Buckets) (newItem *BucketTreeItem, overlaps []*BucketTreeItem) { newItem = convertToBucketTreeItem(buckets) // origin is existed and the version is same. - if origin := h.bucketsOfRegion[buckets.GetRegionId()]; newItem.compareKeyRange(origin) { + if origin := h.bucketsOfRegion[buckets.GetRegionId()]; newItem.equals(origin) { overlaps = []*BucketTreeItem{origin} } else { overlaps = h.getBucketsByKeyRange(newItem.startKey, newItem.endKey) From f6089f13a6c45100004fcfae4a307e6bd0d32ebb Mon Sep 17 00:00:00 2001 From: bufferflies <1045931706@qq.com> Date: Thu, 19 May 2022 12:41:44 +0800 Subject: [PATCH 5/6] fix bug Signed-off-by: bufferflies <1045931706@qq.com> --- .../buckets/bucket_stat_informer.go | 6 ----- server/statistics/buckets/hot_bucket_cache.go | 14 ---------- .../buckets/hot_bucket_cache_test.go | 26 +++++++++++++------ 3 files changed, 18 insertions(+), 28 deletions(-) diff --git a/server/statistics/buckets/bucket_stat_informer.go b/server/statistics/buckets/bucket_stat_informer.go index 532b4c21017..57428377a0e 100644 --- a/server/statistics/buckets/bucket_stat_informer.go +++ b/server/statistics/buckets/bucket_stat_informer.go @@ -17,8 +17,6 @@ package buckets import ( "bytes" "fmt" - "github.com/pingcap/log" - "go.uber.org/zap" "github.com/tikv/pd/pkg/btree" "github.com/tikv/pd/pkg/keyutil" @@ -134,10 +132,6 @@ func cloneBucketItemByRange(b *BucketTreeItem, startKey, endKey []byte) *BucketT if len(endKey) == 0 { right = stat.EndKey } - log.Info("cloneBucketItemByRange", - zap.ByteString("start-key", left), - zap.ByteString("end-key", right), - ) if bytes.Compare(left, right) < 0 { copy := stat.clone() copy.StartKey = left diff --git a/server/statistics/buckets/hot_bucket_cache.go b/server/statistics/buckets/hot_bucket_cache.go index 03467c348d7..b43cf6b5b11 100644 --- a/server/statistics/buckets/hot_bucket_cache.go +++ b/server/statistics/buckets/hot_bucket_cache.go @@ -72,13 +72,6 @@ func bucketDebrisFactory(startKey, endKey []byte, item rangetree.RangeItem) []ra if !bytes.Equal(item.GetStartKey(), left) { res = append(res, cloneBucketItemByRange(bt, item.GetStartKey(), left)) } - log.Info("debris factory", - zap.ByteString("start-key", item.GetStartKey()), - zap.ByteString("end-key", item.GetEndKey()), - zap.ByteString("new-start-key", startKey), - zap.ByteString("new-end-key", endKey), - zap.Int("len", len(res)), - ) // there will be no debris if the right is equal to the end key. if !bytes.Equal(item.GetEndKey(), right) { if len(right) == 0 { @@ -87,13 +80,6 @@ func bucketDebrisFactory(startKey, endKey []byte, item rangetree.RangeItem) []ra res = append(res, cloneBucketItemByRange(bt, right, item.GetEndKey())) } } - log.Info("debris factory", - zap.ByteString("start-key", item.GetStartKey()), - zap.ByteString("end-key", item.GetEndKey()), - zap.ByteString("new-start-key", startKey), - zap.ByteString("new-end-key", endKey), - zap.Int("len", len(res)), - ) return res } diff --git a/server/statistics/buckets/hot_bucket_cache_test.go b/server/statistics/buckets/hot_bucket_cache_test.go index 5aa631ecd9d..63d0e14a03e 100644 --- a/server/statistics/buckets/hot_bucket_cache_test.go +++ b/server/statistics/buckets/hot_bucket_cache_test.go @@ -56,26 +56,36 @@ func (t *testHotBucketCache) TestPutItem(c *C) { treeLen: 3, }, { // case2: region split - // + // origin: |""--10-------30---""| + // new: |15 20| + // tree: |""--10--15--20--30--""| regionID: 2, keys: [][]byte{[]byte("15"), []byte("20")}, regionCount: 1, - treeLen: 3, + treeLen: 5, }, { + // case 3: region split + // origin: |""--10--15--20--30--""| + // new: |20 ---- ""| + // tree: |""--10--15--20------ ""| regionID: 1, - keys: [][]byte{[]byte("20"), []byte("30")}, + keys: [][]byte{[]byte("20"), []byte("")}, version: 3, regionCount: 2, - treeLen: 3, + treeLen: 4, }, { + // case 3: region split + // tree: |""--10--15--20------ ""| + // new: |""----------20| + // tree: |""----------20--------""| regionID: 3, - keys: [][]byte{[]byte("10"), []byte("15")}, - regionCount: 3, - treeLen: 3, + keys: [][]byte{[]byte(""), []byte("20")}, + regionCount: 2, + treeLen: 2, }, { // region 1,2,3 will be merged. regionID: 4, - keys: [][]byte{[]byte("10"), []byte("30")}, + keys: [][]byte{[]byte(""), []byte("")}, regionCount: 1, treeLen: 1, }} From d68802f5784e670b88e72ed54e4309a2a07989ba Mon Sep 17 00:00:00 2001 From: bufferflies <1045931706@qq.com> Date: Thu, 19 May 2022 14:52:09 +0800 Subject: [PATCH 6/6] remove some log Signed-off-by: bufferflies <1045931706@qq.com> --- server/statistics/buckets/bucket_stat_informer.go | 10 +++------- server/statistics/buckets/hot_bucket_cache.go | 6 +++--- server/statistics/buckets/hot_bucket_cache_test.go | 7 +++++-- 3 files changed, 11 insertions(+), 12 deletions(-) diff --git a/server/statistics/buckets/bucket_stat_informer.go b/server/statistics/buckets/bucket_stat_informer.go index 57428377a0e..c64ff947917 100644 --- a/server/statistics/buckets/bucket_stat_informer.go +++ b/server/statistics/buckets/bucket_stat_informer.go @@ -52,11 +52,11 @@ type BucketStat struct { func (b *BucketStat) clone() *BucketStat { c := &BucketStat{ - StartKey: b.StartKey, - EndKey: b.EndKey, RegionID: b.RegionID, HotDegree: b.HotDegree, Interval: b.Interval, + StartKey: b.StartKey, + EndKey: b.EndKey, Loads: make([]uint64, len(b.Loads)), } copy(c.Loads, b.Loads) @@ -105,16 +105,12 @@ func (b *BucketTreeItem) equals(origin *BucketTreeItem) bool { if origin == nil { return false } - // key range must be same if the version is same. - if b.version == origin.version { - return true - } return bytes.Equal(b.startKey, origin.startKey) && bytes.Equal(b.endKey, origin.endKey) } // cloneBucketItemByRange returns a new item with the same key range. // item must have some debris for the given key range -func cloneBucketItemByRange(b *BucketTreeItem, startKey, endKey []byte) *BucketTreeItem { +func (b *BucketTreeItem) cloneBucketItemByRange(startKey, endKey []byte) *BucketTreeItem { item := &BucketTreeItem{ regionID: b.regionID, startKey: startKey, diff --git a/server/statistics/buckets/hot_bucket_cache.go b/server/statistics/buckets/hot_bucket_cache.go index 105cde52567..5aba9ffdab7 100644 --- a/server/statistics/buckets/hot_bucket_cache.go +++ b/server/statistics/buckets/hot_bucket_cache.go @@ -76,14 +76,14 @@ func bucketDebrisFactory(startKey, endKey []byte, item rangetree.RangeItem) []ra bt := item.(*BucketTreeItem) // there will be no debris if the left is equal to the start key. if !bytes.Equal(item.GetStartKey(), left) { - res = append(res, cloneBucketItemByRange(bt, item.GetStartKey(), left)) + res = append(res, bt.cloneBucketItemByRange(item.GetStartKey(), left)) } // there will be no debris if the right is equal to the end key. if !bytes.Equal(item.GetEndKey(), right) || len(right) == 0 { if len(right) == 0 { - res = append(res, cloneBucketItemByRange(bt, item.GetEndKey(), right)) + res = append(res, bt.cloneBucketItemByRange(item.GetEndKey(), right)) } else { - res = append(res, cloneBucketItemByRange(bt, right, item.GetEndKey())) + res = append(res, bt.cloneBucketItemByRange(right, item.GetEndKey())) } } return res diff --git a/server/statistics/buckets/hot_bucket_cache_test.go b/server/statistics/buckets/hot_bucket_cache_test.go index f4d3a8a2e2f..7c8cc85e99c 100644 --- a/server/statistics/buckets/hot_bucket_cache_test.go +++ b/server/statistics/buckets/hot_bucket_cache_test.go @@ -140,7 +140,6 @@ func (t *testHotBucketCache) TestGetBucketsByKeyRange(c *C) { } func (t *testHotBucketCache) TestInherit(c *C) { - // init: key range |10 20|20-50|50-60|(3 2 10) originBucketItem := convertToBucketTreeItem(newTestBuckets(1, 1, [][]byte{[]byte(""), []byte("20"), []byte("50"), []byte("")}, 0)) originBucketItem.stats[0].HotDegree = 3 originBucketItem.stats[1].HotDegree = 2 @@ -165,8 +164,12 @@ func (t *testHotBucketCache) TestInherit(c *C) { // case4: newItem starKey is greater than old. buckets: newTestBuckets(1, 1, [][]byte{[]byte("80"), []byte("")}, 0), expect: []int{10}, + }, { + buckets: newTestBuckets(1, 1, [][]byte{[]byte(""), []byte("")}, 0), + expect: []int{10}, }} + // init: key range |10--20---50---60|(3 2 10) for _, v := range testdata { buckets := convertToBucketTreeItem(v.buckets) buckets.inherit([]*BucketTreeItem{originBucketItem}) @@ -217,7 +220,7 @@ func (t *testHotBucketCache) TestBucketTreeItemClone(c *C) { strict: false, }} for _, v := range testdata { - copy := cloneBucketItemByRange(origin, v.startKey, v.endKey) + copy := origin.cloneBucketItemByRange(v.startKey, v.endKey) c.Assert(copy.startKey, BytesEquals, v.startKey) c.Assert(copy.endKey, BytesEquals, v.endKey) c.Assert(copy.stats, HasLen, v.count)