Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: some fixes for release 3.0 #1624

Merged
merged 5 commits into from
Jul 11, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@
+ Fix the issue about the limit of the hot region [#1552](https://github.com/pingcap/pd/pull/1552)
+ Add a option about grpc gateway [#1596](https://github.com/pingcap/pd/pull/1596)
+ Add the missing schedule config items [#1601](https://github.com/pingcap/pd/pull/1601)
+ Fix the issue about checking the number of replicas before scheduling for hot region scheduler [#1609](https://github.com/pingcap/pd/pull/1609)
+ Set influence for the operator according to the region size [#1613](https://github.com/pingcap/pd/pull/1613)
+ Enlarge the default limit of the hot region scheduler [#1616](https://github.com/pingcap/pd/pull/1616)
+ Fix the issue about ignoring the pending peer when balancing regions [#1617](https://github.com/pingcap/pd/pull/1617)

## v3.0.0

Expand Down
1 change: 1 addition & 0 deletions conf/config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ leader-schedule-limit = 4
region-schedule-limit = 64
replica-schedule-limit = 64
merge-schedule-limit = 8
hot-region-schedule-limit = 4
#enable-one-way-merge = false
#tolerant-size-ratio = 0.0

Expand Down
6 changes: 3 additions & 3 deletions pkg/mock/mockoption/mockoption.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,10 @@ const (
defaultSplitMergeInterval = 0
defaultMaxStoreDownTime = 30 * time.Minute
defaultLeaderScheduleLimit = 4
defaultRegionScheduleLimit = 4
defaultReplicaScheduleLimit = 8
defaultRegionScheduleLimit = 64
defaultReplicaScheduleLimit = 64
defaultMergeScheduleLimit = 8
defaultHotRegionScheduleLimit = 2
defaultHotRegionScheduleLimit = 4
defaultStoreBalanceRate = 60
defaultTolerantSizeRatio = 2.5
defaultLowSpaceRatio = 0.8
Expand Down
2 changes: 1 addition & 1 deletion server/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -379,8 +379,8 @@ func (c *RaftCluster) GetAdjacentRegions(region *core.RegionInfo) (*core.RegionI
// UpdateStoreLabels updates a store's location labels.
func (c *RaftCluster) UpdateStoreLabels(storeID uint64, labels []*metapb.StoreLabel) error {
c.RLock()
defer c.RUnlock()
store := c.cachedCluster.GetStore(storeID)
c.RUnlock()
if store == nil {
return errors.Errorf("invalid store ID %d, not found", storeID)
}
Expand Down
7 changes: 7 additions & 0 deletions server/cluster_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -415,6 +415,13 @@ func (c *clusterInfo) RandFollowerRegion(storeID uint64, opts ...core.RegionOpti
return c.core.RandFollowerRegion(storeID, opts...)
}

// RandPendingRegion returns a random region that has a pending peer on the store.
func (c *clusterInfo) RandPendingRegion(storeID uint64, opts ...core.RegionOption) *core.RegionInfo {
c.RLock()
defer c.RUnlock()
return c.core.RandPendingRegion(storeID, opts...)
}

// GetAverageRegionSize returns the average region approximate size.
func (c *clusterInfo) GetAverageRegionSize() int64 {
c.RLock()
Expand Down
4 changes: 2 additions & 2 deletions server/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -576,11 +576,11 @@ const (
defaultSplitMergeInterval = 1 * time.Hour
defaultPatrolRegionInterval = 100 * time.Millisecond
defaultMaxStoreDownTime = 30 * time.Minute
defaultLeaderScheduleLimit = 8
defaultLeaderScheduleLimit = 4
defaultRegionScheduleLimit = 64
defaultReplicaScheduleLimit = 64
defaultMergeScheduleLimit = 8
defaultHotRegionScheduleLimit = 2
defaultHotRegionScheduleLimit = 4
defaultStoreBalanceRate = 15
defaultTolerantSizeRatio = 0
defaultLowSpaceRatio = 0.8
Expand Down
5 changes: 5 additions & 0 deletions server/core/basic_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,11 @@ func (bc *BasicCluster) RandLeaderRegion(storeID uint64, opts ...RegionOption) *
return bc.Regions.RandLeaderRegion(storeID, opts...)
}

// RandPendingRegion returns a random region that has a pending peer on the store.
func (bc *BasicCluster) RandPendingRegion(storeID uint64, opts ...RegionOption) *RegionInfo {
return bc.Regions.RandPendingRegion(storeID, opts...)
}

// GetAverageRegionSize returns the average region approximate size.
func (bc *BasicCluster) GetAverageRegionSize() int64 {
return bc.Regions.GetAverageRegionSize()
Expand Down
9 changes: 7 additions & 2 deletions server/core/region.go
Original file line number Diff line number Diff line change
Expand Up @@ -668,12 +668,17 @@ func (r *RegionsInfo) RandRegion(opts ...RegionOption) *RegionInfo {
return randRegion(r.regions, opts...)
}

// RandLeaderRegion get a store's leader region by random
// RandPendingRegion randomly gets a store's region with a pending peer.
func (r *RegionsInfo) RandPendingRegion(storeID uint64, opts ...RegionOption) *RegionInfo {
return randRegion(r.pendingPeers[storeID], opts...)
}

// RandLeaderRegion randomly gets a store's leader region.
func (r *RegionsInfo) RandLeaderRegion(storeID uint64, opts ...RegionOption) *RegionInfo {
return randRegion(r.leaders[storeID], opts...)
}

// RandFollowerRegion get a store's follower region by random
// RandFollowerRegion randomly gets a store's follower region.
func (r *RegionsInfo) RandFollowerRegion(storeID uint64, opts ...RegionOption) *RegionInfo {
return randRegion(r.followers[storeID], opts...)
}
Expand Down
7 changes: 7 additions & 0 deletions server/core/region_option.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,13 @@ func HealthRegion() RegionOption {
}
}

// HealthRegionAllowPending checks if the region is healthy with allowing the pending peer.
func HealthRegionAllowPending() RegionOption {
return func(region *RegionInfo) bool {
return len(region.downPeers) == 0 && len(region.learners) == 0
}
}

// RegionCreateOption used to create region.
type RegionCreateOption func(region *RegionInfo)

Expand Down
14 changes: 9 additions & 5 deletions server/schedule/filters.go
Original file line number Diff line number Diff line change
Expand Up @@ -415,8 +415,15 @@ func (f StoreStateFilter) FilterTarget(opt Options, store *core.StoreInfo) bool
return true
}

if f.MoveRegion && f.filterMoveRegion(opt, store) {
return true
if f.MoveRegion {
// only target consider the pending peers because pending more means the disk is slower.
if opt.GetMaxPendingPeerCount() > 0 && store.GetPendingPeerCount() > int(opt.GetMaxPendingPeerCount()) {
return true
}

if f.filterMoveRegion(opt, store) {
return true
}
}
return false
}
Expand All @@ -430,9 +437,6 @@ func (f StoreStateFilter) filterMoveRegion(opt Options, store *core.StoreInfo) b
return true
}

if opt.GetMaxPendingPeerCount() > 0 && store.GetPendingPeerCount() > int(opt.GetMaxPendingPeerCount()) {
return true
}
if uint64(store.GetSendingSnapCount()) > opt.GetMaxSnapshotCount() ||
uint64(store.GetReceivingSnapCount()) > opt.GetMaxSnapshotCount() ||
uint64(store.GetApplyingSnapCount()) > opt.GetMaxSnapshotCount() {
Expand Down
45 changes: 45 additions & 0 deletions server/schedule/merge_checker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,22 @@
package schedule

import (
"testing"
"time"

. "github.com/pingcap/check"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/pd/pkg/mock/mockcluster"
"github.com/pingcap/pd/pkg/mock/mockhbstream"
"github.com/pingcap/pd/pkg/mock/mockoption"
"github.com/pingcap/pd/server/core"
"github.com/pingcap/pd/server/namespace"
)

func TestChecker(t *testing.T) {
TestingT(t)
}

var _ = Suite(&testMergeCheckerSuite{})

type testMergeCheckerSuite struct {
Expand Down Expand Up @@ -257,3 +263,42 @@ func (s *testMergeCheckerSuite) TestMatchPeers(c *C) {
},
})
}

func (s *testMergeCheckerSuite) TestStorelimit(c *C) {
oc := NewOperatorController(s.cluster, mockhbstream.NewHeartbeatStream())
s.cluster.ScheduleOptions.SplitMergeInterval = time.Hour
s.cluster.ScheduleOptions.StoreBalanceRate = 60
s.regions[2] = s.regions[2].Clone(
core.SetPeers([]*metapb.Peer{
{Id: 109, StoreId: 2},
{Id: 110, StoreId: 3},
{Id: 111, StoreId: 6},
}),
core.WithLeader(&metapb.Peer{Id: 109, StoreId: 2}),
)
s.cluster.PutRegion(s.regions[2])
ops := s.mc.Check(s.regions[2])
c.Assert(ops, NotNil)
// The size of Region is less or equal than 1MB.
for i := 0; i < 50; i++ {
c.Assert(oc.AddOperator(ops...), IsTrue)
for _, op := range ops {
oc.RemoveOperator(op)
}
}
s.regions[2] = s.regions[2].Clone(
core.SetApproximateSize(2),
core.SetApproximateKeys(2),
)
s.cluster.PutRegion(s.regions[2])
ops = s.mc.Check(s.regions[2])
c.Assert(ops, NotNil)
// The size of Region is more than 1MB but no more than 20MB.
for i := 0; i < 5; i++ {
c.Assert(oc.AddOperator(ops...), IsTrue)
for _, op := range ops {
oc.RemoveOperator(op)
}
}
c.Assert(oc.AddOperator(ops...), IsFalse)
}
23 changes: 19 additions & 4 deletions server/schedule/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,11 @@ const (
RegionOperatorWaitTime = 10 * time.Minute
// RegionInfluence represents the influence of a operator step, which is used by ratelimit.
RegionInfluence int64 = 1000
// smallRegionInfluence represents the influence of a operator step
// when the region size is smaller than smallRegionThreshold, which is used by ratelimit.
smallRegionInfluence int64 = 200
// smallRegionThreshold is used to represent a region which can be regarded as a small region once the size is small than it.
smallRegionThreshold int64 = 20
)

// OperatorStep describes the basic scheduling steps that can not be subdivided.
Expand Down Expand Up @@ -98,9 +103,14 @@ func (ap AddPeer) IsFinish(region *core.RegionInfo) bool {
func (ap AddPeer) Influence(opInfluence OpInfluence, region *core.RegionInfo) {
to := opInfluence.GetStoreInfluence(ap.ToStore)

to.RegionSize += region.GetApproximateSize()
regionSize := region.GetApproximateSize()
to.RegionSize += regionSize
to.RegionCount++
to.StepCost += RegionInfluence
if regionSize > smallRegionThreshold {
to.StepCost += RegionInfluence
} else if regionSize <= smallRegionThreshold && regionSize > core.EmptyRegionApproximateSize {
to.StepCost += smallRegionInfluence
}
}

// AddLearner is an OperatorStep that adds a region learner peer.
Expand Down Expand Up @@ -128,9 +138,14 @@ func (al AddLearner) IsFinish(region *core.RegionInfo) bool {
func (al AddLearner) Influence(opInfluence OpInfluence, region *core.RegionInfo) {
to := opInfluence.GetStoreInfluence(al.ToStore)

to.RegionSize += region.GetApproximateSize()
regionSize := region.GetApproximateSize()
to.RegionSize += regionSize
to.RegionCount++
to.StepCost += RegionInfluence
if regionSize > smallRegionThreshold {
to.StepCost += RegionInfluence
} else if regionSize <= smallRegionThreshold && regionSize > core.EmptyRegionApproximateSize {
to.StepCost += smallRegionInfluence
}
}

// PromoteLearner is an OperatorStep that promotes a region learner peer to normal voter.
Expand Down
34 changes: 17 additions & 17 deletions server/schedule/operator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func (s *testOperatorSuite) newTestRegion(regionID uint64, leaderPeer uint64, pe
leader = peer
}
}
regionInfo := core.NewRegionInfo(&region, leader, core.SetApproximateSize(10), core.SetApproximateKeys(10))
regionInfo := core.NewRegionInfo(&region, leader, core.SetApproximateSize(50), core.SetApproximateKeys(50))
return regionInfo
}

Expand Down Expand Up @@ -124,71 +124,71 @@ func (s *testOperatorSuite) TestInfluence(c *C) {
c.Assert(*storeOpInfluence[2], DeepEquals, StoreInfluence{
LeaderSize: 0,
LeaderCount: 0,
RegionSize: 10,
RegionSize: 50,
RegionCount: 1,
StepCost: 1000,
})

TransferLeader{FromStore: 1, ToStore: 2}.Influence(opInfluence, region)
c.Assert(*storeOpInfluence[1], DeepEquals, StoreInfluence{
LeaderSize: -10,
LeaderSize: -50,
LeaderCount: -1,
RegionSize: 0,
RegionCount: 0,
StepCost: 0,
})
c.Assert(*storeOpInfluence[2], DeepEquals, StoreInfluence{
LeaderSize: 10,
LeaderSize: 50,
LeaderCount: 1,
RegionSize: 10,
RegionSize: 50,
RegionCount: 1,
StepCost: 1000,
})

RemovePeer{FromStore: 1}.Influence(opInfluence, region)
c.Assert(*storeOpInfluence[1], DeepEquals, StoreInfluence{
LeaderSize: -10,
LeaderSize: -50,
LeaderCount: -1,
RegionSize: -10,
RegionSize: -50,
RegionCount: -1,
StepCost: 0,
})
c.Assert(*storeOpInfluence[2], DeepEquals, StoreInfluence{
LeaderSize: 10,
LeaderSize: 50,
LeaderCount: 1,
RegionSize: 10,
RegionSize: 50,
RegionCount: 1,
StepCost: 1000,
})

MergeRegion{IsPassive: false}.Influence(opInfluence, region)
c.Assert(*storeOpInfluence[1], DeepEquals, StoreInfluence{
LeaderSize: -10,
LeaderSize: -50,
LeaderCount: -1,
RegionSize: -10,
RegionSize: -50,
RegionCount: -1,
StepCost: 0,
})
c.Assert(*storeOpInfluence[2], DeepEquals, StoreInfluence{
LeaderSize: 10,
LeaderSize: 50,
LeaderCount: 1,
RegionSize: 10,
RegionSize: 50,
RegionCount: 1,
StepCost: 1000,
})

MergeRegion{IsPassive: true}.Influence(opInfluence, region)
c.Assert(*storeOpInfluence[1], DeepEquals, StoreInfluence{
LeaderSize: -10,
LeaderSize: -50,
LeaderCount: -2,
RegionSize: -10,
RegionSize: -50,
RegionCount: -2,
StepCost: 0,
})
c.Assert(*storeOpInfluence[2], DeepEquals, StoreInfluence{
LeaderSize: 10,
LeaderSize: 50,
LeaderCount: 1,
RegionSize: 10,
RegionSize: 50,
RegionCount: 0,
StepCost: 1000,
})
Expand Down
18 changes: 12 additions & 6 deletions server/schedule/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,22 +25,28 @@ import (
"go.uber.org/zap"
)

// Cluster provides an overview of a cluster's regions distribution.
type Cluster interface {
// RegionSetInformer provides access to a shared informer of regions.
// TODO: move to core package
type RegionSetInformer interface {
RandFollowerRegion(storeID uint64, opts ...core.RegionOption) *core.RegionInfo
RandLeaderRegion(storeID uint64, opts ...core.RegionOption) *core.RegionInfo
RandPendingRegion(storeID uint64, opts ...core.RegionOption) *core.RegionInfo
GetAverageRegionSize() int64
GetStoreRegionCount(storeID uint64) int
GetRegion(id uint64) *core.RegionInfo
GetAdjacentRegions(region *core.RegionInfo) (*core.RegionInfo, *core.RegionInfo)
ScanRegions(startKey []byte, limit int) []*core.RegionInfo
}

// Cluster provides an overview of a cluster's regions distribution.
type Cluster interface {
RegionSetInformer
GetStores() []*core.StoreInfo
GetStore(id uint64) *core.StoreInfo
GetRegion(id uint64) *core.RegionInfo

GetRegionStores(region *core.RegionInfo) []*core.StoreInfo
GetFollowerStores(region *core.RegionInfo) []*core.StoreInfo
GetLeaderStore(region *core.RegionInfo) *core.StoreInfo
GetAdjacentRegions(region *core.RegionInfo) (*core.RegionInfo, *core.RegionInfo)
ScanRegions(startKey []byte, limit int) []*core.RegionInfo

BlockStore(id uint64) error
UnblockStore(id uint64)

Expand Down
Loading