Skip to content

Commit

Permalink
Merge branch 'master' into split_buckets
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot[bot] authored Jul 31, 2023
2 parents 5de3840 + 16926ad commit df5f059
Show file tree
Hide file tree
Showing 15 changed files with 344 additions and 63 deletions.
9 changes: 9 additions & 0 deletions pkg/core/region.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ type RegionInfo struct {
readBytes uint64
readKeys uint64
approximateSize int64
approximateKvSize int64
approximateKeys int64
interval *pdpb.TimeInterval
replicationStatus *replication_modepb.RegionReplicationStatus
Expand Down Expand Up @@ -151,6 +152,7 @@ func RegionFromHeartbeat(heartbeat *pdpb.RegionHeartbeatRequest, opts ...RegionC
if heartbeat.GetApproximateSize() > 0 && regionSize < EmptyRegionApproximateSize {
regionSize = EmptyRegionApproximateSize
}
regionKvSize := heartbeat.GetApproximateKvSize() / units.MiB

region := &RegionInfo{
term: heartbeat.GetTerm(),
Expand All @@ -164,6 +166,7 @@ func RegionFromHeartbeat(heartbeat *pdpb.RegionHeartbeatRequest, opts ...RegionC
readBytes: heartbeat.GetBytesRead(),
readKeys: heartbeat.GetKeysRead(),
approximateSize: int64(regionSize),
approximateKvSize: int64(regionKvSize),
approximateKeys: int64(heartbeat.GetApproximateKeys()),
interval: heartbeat.GetInterval(),
replicationStatus: heartbeat.GetReplicationStatus(),
Expand Down Expand Up @@ -230,6 +233,7 @@ func (r *RegionInfo) Clone(opts ...RegionCreateOption) *RegionInfo {
readBytes: r.readBytes,
readKeys: r.readKeys,
approximateSize: r.approximateSize,
approximateKvSize: r.approximateKvSize,
approximateKeys: r.approximateKeys,
interval: typeutil.DeepClone(r.interval, TimeIntervalFactory),
replicationStatus: r.replicationStatus,
Expand Down Expand Up @@ -520,6 +524,11 @@ func (r *RegionInfo) GetStorePeerApproximateKeys(storeID uint64) int64 {
return r.approximateKeys
}

// GetApproximateKvSize returns the approximate kv size of the region.
func (r *RegionInfo) GetApproximateKvSize() int64 {
return r.approximateKvSize
}

// GetApproximateKeys returns the approximate keys of the region.
func (r *RegionInfo) GetApproximateKeys() int64 {
return r.approximateKeys
Expand Down
7 changes: 7 additions & 0 deletions pkg/core/region_option.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,13 @@ func SetApproximateSize(v int64) RegionCreateOption {
}
}

// SetApproximateKvSize sets the approximate size for the region.
func SetApproximateKvSize(v int64) RegionCreateOption {
return func(region *RegionInfo) {
region.approximateKvSize = v
}
}

// SetApproximateKeys sets the approximate keys for the region.
func SetApproximateKeys(v int64) RegionCreateOption {
return func(region *RegionInfo) {
Expand Down
31 changes: 31 additions & 0 deletions pkg/gc/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
// Copyright 2023 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package gc

import "github.com/prometheus/client_golang/prometheus"

var (
gcSafePointGauge = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "pd",
Subsystem: "gc",
Name: "gc_safepoint",
Help: "The ts of gc safepoint",
}, []string{"type"})
)

func init() {
prometheus.MustRegister(gcSafePointGauge)
}
3 changes: 3 additions & 0 deletions pkg/gc/safepoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,9 @@ func (manager *SafePointManager) UpdateGCSafePoint(newSafePoint uint64) (oldSafe
return
}
err = manager.store.SaveGCSafePoint(newSafePoint)
if err == nil {
gcSafePointGauge.WithLabelValues("gc_safepoint").Set(float64(newSafePoint))
}
return
}

Expand Down
47 changes: 37 additions & 10 deletions pkg/keyspace/keyspace.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,14 +194,6 @@ func (manager *Manager) CreateKeyspace(request *CreateKeyspaceRequest) (*keyspac
if err != nil {
return nil, err
}
// If the request to create a keyspace is pre-allocated when the PD starts,
// there is no need to wait for the region split, because TiKV has not started.
waitRegionSplit := !request.IsPreAlloc && manager.config.ToWaitRegionSplit()
// Split keyspace region.
err = manager.splitKeyspaceRegion(newID, waitRegionSplit)
if err != nil {
return nil, err
}
userKind := endpoint.StringUserKind(request.Config[UserKindKey])
config, err := manager.kgm.GetKeyspaceConfigByKind(userKind)
if err != nil {
Expand All @@ -215,16 +207,51 @@ func (manager *Manager) CreateKeyspace(request *CreateKeyspaceRequest) (*keyspac
request.Config[UserKindKey] = config[UserKindKey]
}
}
// Create and save keyspace metadata.
// Create a disabled keyspace meta for tikv-server to get the config on keyspace split.
keyspace := &keyspacepb.KeyspaceMeta{
Id: newID,
Name: request.Name,
State: keyspacepb.KeyspaceState_ENABLED,
State: keyspacepb.KeyspaceState_DISABLED,
CreatedAt: request.CreateTime,
StateChangedAt: request.CreateTime,
Config: request.Config,
}
err = manager.saveNewKeyspace(keyspace)
if err != nil {
log.Warn("[keyspace] failed to save keyspace before split",
zap.Uint32("keyspace-id", keyspace.GetId()),
zap.String("name", keyspace.GetName()),
zap.Error(err),
)
return nil, err
}
// If the request to create a keyspace is pre-allocated when the PD starts,
// there is no need to wait for the region split, because TiKV has not started.
waitRegionSplit := !request.IsPreAlloc && manager.config.ToWaitRegionSplit()
// Split keyspace region.
err = manager.splitKeyspaceRegion(newID, waitRegionSplit)
if err != nil {
err2 := manager.store.RunInTxn(manager.ctx, func(txn kv.Txn) error {
idPath := endpoint.KeyspaceIDPath(request.Name)
metaPath := endpoint.KeyspaceMetaPath(newID)
e := txn.Remove(idPath)
if e != nil {
return e
}
return txn.Remove(metaPath)
})
if err2 != nil {
log.Warn("[keyspace] failed to remove pre-created keyspace after split failed",
zap.Uint32("keyspace-id", keyspace.GetId()),
zap.String("name", keyspace.GetName()),
zap.Error(err2),
)
}
return nil, err
}
// enable the keyspace metadata after split.
keyspace.State = keyspacepb.KeyspaceState_ENABLED
_, err = manager.UpdateKeyspaceStateByID(newID, keyspacepb.KeyspaceState_ENABLED, request.CreateTime)
if err != nil {
log.Warn("[keyspace] failed to create keyspace",
zap.Uint32("keyspace-id", keyspace.GetId()),
Expand Down
101 changes: 58 additions & 43 deletions pkg/schedule/schedulers/hot_region.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import (
)

var (
topnPosition = 10
statisticsInterval = time.Second
// WithLabelValues is a heavy operation, define variable to avoid call it every time.
hotSchedulerCounter = schedulerCounter.WithLabelValues(HotRegionName, "schedule")
Expand Down Expand Up @@ -451,11 +452,13 @@ func isAvailableV1(s *solution) bool {

type balanceSolver struct {
sche.SchedulerCluster
sche *hotScheduler
stLoadDetail map[uint64]*statistics.StoreLoadDetail
rwTy statistics.RWType
opTy opType
resourceTy resourceType
sche *hotScheduler
stLoadDetail map[uint64]*statistics.StoreLoadDetail
filteredHotPeers map[uint64][]*statistics.HotPeerStat // storeID -> hotPeers(filtered)
nthHotPeer map[uint64][]*statistics.HotPeerStat // storeID -> [dimLen]hotPeers
rwTy statistics.RWType
opTy opType
resourceTy resourceType

cur *solution

Expand Down Expand Up @@ -494,8 +497,21 @@ type balanceSolver struct {
}

func (bs *balanceSolver) init() {
// Init store load detail according to the type.
// Load the configuration items of the scheduler.
bs.resourceTy = toResourceType(bs.rwTy, bs.opTy)
bs.maxPeerNum = bs.sche.conf.GetMaxPeerNumber()
bs.minHotDegree = bs.GetSchedulerConfig().GetHotRegionCacheHitsThreshold()
bs.firstPriority, bs.secondPriority = prioritiesToDim(bs.getPriorities())
bs.greatDecRatio, bs.minorDecRatio = bs.sche.conf.GetGreatDecRatio(), bs.sche.conf.GetMinorDecRatio()
bs.isRaftKV2 = bs.GetStoreConfig().IsRaftKV2()
switch bs.sche.conf.GetRankFormulaVersion() {
case "v1":
bs.initRankV1()
default:
bs.initRankV2()
}

// Init store load detail according to the type.
bs.stLoadDetail = bs.sche.stLoadInfos[bs.resourceTy]

bs.maxSrc = &statistics.StoreLoad{Loads: make([]float64, statistics.DimLen)}
Expand All @@ -508,10 +524,14 @@ func (bs *balanceSolver) init() {
}
maxCur := &statistics.StoreLoad{Loads: make([]float64, statistics.DimLen)}

bs.filteredHotPeers = make(map[uint64][]*statistics.HotPeerStat)
bs.nthHotPeer = make(map[uint64][]*statistics.HotPeerStat)
for _, detail := range bs.stLoadDetail {
bs.maxSrc = statistics.MaxLoad(bs.maxSrc, detail.LoadPred.Min())
bs.minDst = statistics.MinLoad(bs.minDst, detail.LoadPred.Max())
maxCur = statistics.MaxLoad(maxCur, &detail.LoadPred.Current)
bs.nthHotPeer[detail.GetID()] = make([]*statistics.HotPeerStat, statistics.DimLen)
bs.filteredHotPeers[detail.GetID()] = bs.filterHotPeers(detail)
}

rankStepRatios := []float64{
Expand All @@ -526,19 +546,6 @@ func (bs *balanceSolver) init() {
Loads: stepLoads,
Count: maxCur.Count * bs.sche.conf.GetCountRankStepRatio(),
}

bs.firstPriority, bs.secondPriority = prioritiesToDim(bs.getPriorities())
bs.greatDecRatio, bs.minorDecRatio = bs.sche.conf.GetGreatDecRatio(), bs.sche.conf.GetMinorDecRatio()
bs.maxPeerNum = bs.sche.conf.GetMaxPeerNumber()
bs.minHotDegree = bs.GetSchedulerConfig().GetHotRegionCacheHitsThreshold()
bs.isRaftKV2 = bs.GetStoreConfig().IsRaftKV2()

switch bs.sche.conf.GetRankFormulaVersion() {
case "v1":
bs.initRankV1()
default:
bs.initRankV2()
}
}

func (bs *balanceSolver) initRankV1() {
Expand Down Expand Up @@ -662,7 +669,7 @@ func (bs *balanceSolver) solve() []*operator.Operator {
for _, srcStore := range bs.filterSrcStores() {
bs.cur.srcStore = srcStore
srcStoreID := srcStore.GetID()
for _, mainPeerStat := range bs.filterHotPeers(srcStore) {
for _, mainPeerStat := range bs.filteredHotPeers[srcStoreID] {
if bs.cur.region = bs.getRegion(mainPeerStat, srcStoreID); bs.cur.region == nil {
continue
} else if bs.opTy == movePeer {
Expand Down Expand Up @@ -690,7 +697,7 @@ func (bs *balanceSolver) solve() []*operator.Operator {
if bs.needSearchRevertRegions() {
hotSchedulerSearchRevertRegionsCounter.Inc()
dstStoreID := dstStore.GetID()
for _, revertPeerStat := range bs.filterHotPeers(bs.cur.dstStore) {
for _, revertPeerStat := range bs.filteredHotPeers[dstStoreID] {
revertRegion := bs.getRegion(revertPeerStat, dstStoreID)
if revertRegion == nil || revertRegion.GetID() == bs.cur.region.GetID() ||
!allowRevertRegion(revertRegion, srcStoreID) {
Expand Down Expand Up @@ -871,44 +878,52 @@ func (bs *balanceSolver) checkSrcHistoryLoadsByPriorityAndTolerance(current, exp

// filterHotPeers filtered hot peers from statistics.HotPeerStat and deleted the peer if its region is in pending status.
// The returned hotPeer count in controlled by `max-peer-number`.
func (bs *balanceSolver) filterHotPeers(storeLoad *statistics.StoreLoadDetail) (ret []*statistics.HotPeerStat) {
func (bs *balanceSolver) filterHotPeers(storeLoad *statistics.StoreLoadDetail) []*statistics.HotPeerStat {
hotPeers := storeLoad.HotPeers
ret := make([]*statistics.HotPeerStat, 0, len(hotPeers))
appendItem := func(item *statistics.HotPeerStat) {
if _, ok := bs.sche.regionPendings[item.ID()]; !ok && !item.IsNeedCoolDownTransferLeader(bs.minHotDegree, bs.rwTy) {
// no in pending operator and no need cool down after transfer leader
ret = append(ret, item)
}
}

src := storeLoad.HotPeers
// At most MaxPeerNum peers, to prevent balanceSolver.solve() too slow.
if len(src) <= bs.maxPeerNum {
ret = make([]*statistics.HotPeerStat, 0, len(src))
for _, peer := range src {
appendItem(peer)
}
} else {
union := bs.sortHotPeers(src)
var firstSort, secondSort []*statistics.HotPeerStat
if len(hotPeers) >= topnPosition || len(hotPeers) > bs.maxPeerNum {
firstSort = make([]*statistics.HotPeerStat, len(hotPeers))
copy(firstSort, hotPeers)
sort.Slice(firstSort, func(i, j int) bool {
return firstSort[i].GetLoad(bs.firstPriority) > firstSort[j].GetLoad(bs.firstPriority)
})
secondSort = make([]*statistics.HotPeerStat, len(hotPeers))
copy(secondSort, hotPeers)
sort.Slice(secondSort, func(i, j int) bool {
return secondSort[i].GetLoad(bs.secondPriority) > secondSort[j].GetLoad(bs.secondPriority)
})
}
if len(hotPeers) >= topnPosition {
storeID := storeLoad.GetID()
bs.nthHotPeer[storeID][bs.firstPriority] = firstSort[topnPosition-1]
bs.nthHotPeer[storeID][bs.secondPriority] = secondSort[topnPosition-1]
}
if len(hotPeers) > bs.maxPeerNum {
union := bs.sortHotPeers(firstSort, secondSort)
ret = make([]*statistics.HotPeerStat, 0, len(union))
for peer := range union {
appendItem(peer)
}
return ret
}

return
for _, peer := range hotPeers {
appendItem(peer)
}
return ret
}

func (bs *balanceSolver) sortHotPeers(ret []*statistics.HotPeerStat) map[*statistics.HotPeerStat]struct{} {
firstSort := make([]*statistics.HotPeerStat, len(ret))
copy(firstSort, ret)
sort.Slice(firstSort, func(i, j int) bool {
return firstSort[i].GetLoad(bs.firstPriority) > firstSort[j].GetLoad(bs.firstPriority)
})
secondSort := make([]*statistics.HotPeerStat, len(ret))
copy(secondSort, ret)
sort.Slice(secondSort, func(i, j int) bool {
return secondSort[i].GetLoad(bs.secondPriority) > secondSort[j].GetLoad(bs.secondPriority)
})
func (bs *balanceSolver) sortHotPeers(firstSort, secondSort []*statistics.HotPeerStat) map[*statistics.HotPeerStat]struct{} {
union := make(map[*statistics.HotPeerStat]struct{}, bs.maxPeerNum)
// At most MaxPeerNum peers, to prevent balanceSolver.solve() too slow.
for len(union) < bs.maxPeerNum {
for len(firstSort) > 0 {
peer := firstSort[0]
Expand Down
11 changes: 7 additions & 4 deletions pkg/schedule/schedulers/hot_region_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1981,20 +1981,23 @@ func TestHotCacheSortHotPeer(t *testing.T) {
},
}}

st := &statistics.StoreLoadDetail{
HotPeers: hotPeers,
}
leaderSolver.maxPeerNum = 1
u := leaderSolver.sortHotPeers(hotPeers)
u := leaderSolver.filterHotPeers(st)
checkSortResult(re, []uint64{1}, u)

leaderSolver.maxPeerNum = 2
u = leaderSolver.sortHotPeers(hotPeers)
u = leaderSolver.filterHotPeers(st)
checkSortResult(re, []uint64{1, 2}, u)
}

func checkSortResult(re *require.Assertions, regions []uint64, hotPeers map[*statistics.HotPeerStat]struct{}) {
func checkSortResult(re *require.Assertions, regions []uint64, hotPeers []*statistics.HotPeerStat) {
re.Equal(len(hotPeers), len(regions))
for _, region := range regions {
in := false
for hotPeer := range hotPeers {
for _, hotPeer := range hotPeers {
if hotPeer.RegionID == region {
in = true
break
Expand Down
Loading

0 comments on commit df5f059

Please sign in to comment.