Skip to content

Commit

Permalink
schedule: fix the zero store limit (#2564)
Browse files Browse the repository at this point in the history
Signed-off-by: Ryan Leung <rleungx@gmail.com>
  • Loading branch information
rleungx authored Jun 22, 2020
1 parent 11ed61f commit 68a9215
Show file tree
Hide file tree
Showing 8 changed files with 85 additions and 25 deletions.
5 changes: 3 additions & 2 deletions server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -1684,18 +1684,19 @@ func (c *RaftCluster) GetAllStoresLimit() map[uint64]config.StoreLimitConfig {
}

// AddStoreLimit add a store limit for a given store ID.
func (c *RaftCluster) AddStoreLimit(storeID uint64, isTiFlashStore bool) {
func (c *RaftCluster) AddStoreLimit(store *metapb.Store) {
cfg := c.opt.GetScheduleConfig().Clone()
sc := config.StoreLimitConfig{
AddPeer: config.DefaultStoreLimit.GetDefaultStoreLimit(storelimit.AddPeer),
RemovePeer: config.DefaultStoreLimit.GetDefaultStoreLimit(storelimit.RemovePeer),
}
if isTiFlashStore {
if core.IsTiFlashStore(store) {
sc = config.StoreLimitConfig{
AddPeer: config.DefaultTiFlashStoreLimit.GetDefaultStoreLimit(storelimit.AddPeer),
RemovePeer: config.DefaultTiFlashStoreLimit.GetDefaultStoreLimit(storelimit.RemovePeer),
}
}
storeID := store.GetId()
cfg.StoreLimit[storeID] = sc
c.opt.SetScheduleConfig(cfg)
}
Expand Down
6 changes: 3 additions & 3 deletions server/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ var (

// StoreLimit is the default limit of adding peer and removing peer when putting stores.
type StoreLimit struct {
mu sync.Mutex
mu sync.RWMutex
// AddPeer is the default rate of adding peers for store limit (per minute).
AddPeer float64
// RemovePeer is the default rate of removing peers for store limit (per minute).
Expand All @@ -247,8 +247,8 @@ func (sl *StoreLimit) SetDefaultStoreLimit(typ storelimit.Type, ratePerMin float

// GetDefaultStoreLimit gets the default store limit for a given type.
func (sl *StoreLimit) GetDefaultStoreLimit(typ storelimit.Type) float64 {
sl.mu.Lock()
defer sl.mu.Unlock()
sl.mu.RLock()
defer sl.mu.RUnlock()
switch typ {
case storelimit.AddPeer:
return sl.AddPeer
Expand Down
11 changes: 10 additions & 1 deletion server/config/persist_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,11 +206,13 @@ func (o *PersistOptions) SetAllStoresLimit(typ storelimit.Type, ratePerMin float
v := o.GetScheduleConfig().Clone()
switch typ {
case storelimit.AddPeer:
DefaultStoreLimit.SetDefaultStoreLimit(storelimit.AddPeer, ratePerMin)
for storeID := range v.StoreLimit {
sc := StoreLimitConfig{AddPeer: ratePerMin, RemovePeer: v.StoreLimit[storeID].RemovePeer}
v.StoreLimit[storeID] = sc
}
case storelimit.RemovePeer:
DefaultStoreLimit.SetDefaultStoreLimit(storelimit.RemovePeer, ratePerMin)
for storeID := range v.StoreLimit {
sc := StoreLimitConfig{AddPeer: v.StoreLimit[storeID].AddPeer, RemovePeer: ratePerMin}
v.StoreLimit[storeID] = sc
Expand Down Expand Up @@ -270,7 +272,14 @@ func (o *PersistOptions) GetStoreLimit(storeID uint64) StoreLimitConfig {
if limit, ok := o.GetScheduleConfig().StoreLimit[storeID]; ok {
return limit
}
return StoreLimitConfig{0, 0}
cfg := o.GetScheduleConfig().Clone()
sc := StoreLimitConfig{
AddPeer: DefaultStoreLimit.GetDefaultStoreLimit(storelimit.AddPeer),
RemovePeer: DefaultStoreLimit.GetDefaultStoreLimit(storelimit.RemovePeer),
}
cfg.StoreLimit[storeID] = sc
o.SetScheduleConfig(cfg)
return o.GetScheduleConfig().StoreLimit[storeID]
}

// GetStoreLimitByType returns the limit of a store with a given type.
Expand Down
11 changes: 11 additions & 0 deletions server/core/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -671,3 +671,14 @@ func (s *StoresInfo) UpdateStoreStatus(storeID uint64, leaderCount int, regionCo
s.SetStore(newStore)
}
}

// IsTiFlashStore used to judge flash store.
// FIXME: remove the hack way
func IsTiFlashStore(store *metapb.Store) bool {
for _, l := range store.GetLabels() {
if l.GetKey() == "engine" && l.GetValue() == "tiflash" {
return true
}
}
return false
}
8 changes: 2 additions & 6 deletions server/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ func (s *Server) PutStore(ctx context.Context, request *pdpb.PutStoreRequest) (*
}

// NOTE: can be removed when placement rules feature is enabled by default.
if !s.GetConfig().Replication.EnablePlacementRules && isTiFlashStore(store) {
if !s.GetConfig().Replication.EnablePlacementRules && core.IsTiFlashStore(store) {
return nil, status.Errorf(codes.FailedPrecondition, "placement rules is disabled")
}

Expand All @@ -235,11 +235,7 @@ func (s *Server) PutStore(ctx context.Context, request *pdpb.PutStoreRequest) (*
log.Info("put store ok", zap.Stringer("store", store))
rc.OnStoreVersionChange()
CheckPDVersion(s.persistOptions)
if isTiFlashStore(store) {
rc.AddStoreLimit(store.GetId(), true /* isTiFlashStore*/)
} else {
rc.AddStoreLimit(store.GetId(), false /* isTiFlashStore*/)
}
rc.AddStoreLimit(store)

return &pdpb.PutStoreResponse{
Header: s.header(),
Expand Down
2 changes: 1 addition & 1 deletion server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -765,7 +765,7 @@ func (s *Server) SetReplicationConfig(cfg config.ReplicationConfig) error {
} else {
// NOTE: can be removed after placement rules feature is enabled by default.
for _, s := range raftCluster.GetStores() {
if !s.IsTombstone() && isTiFlashStore(s.GetMeta()) {
if !s.IsTombstone() && core.IsTiFlashStore(s.GetMeta()) {
return errors.New("cannot disable placement rules with TiFlash nodes")
}
}
Expand Down
12 changes: 0 additions & 12 deletions server/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"path"
"time"

"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/pingcap/log"
"github.com/pingcap/pd/v4/pkg/etcdutil"
Expand Down Expand Up @@ -182,14 +181,3 @@ func checkBootstrapRequest(clusterID uint64, req *pdpb.BootstrapRequest) error {

return nil
}

// isTiFlashStore used to judge flash store.
// FIXME: remove the hack way
func isTiFlashStore(store *metapb.Store) bool {
for _, l := range store.GetLabels() {
if l.GetKey() == "engine" && l.GetValue() == "tiflash" {
return true
}
}
return false
}
55 changes: 55 additions & 0 deletions tests/server/cluster/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -960,3 +960,58 @@ func (s *clusterTestSuite) TestOfflineStoreLimit(c *C) {
c.Assert(oc.RemoveOperator(op), IsTrue)
}
}

func (s *clusterTestSuite) TestUpgradeStoreLimit(c *C) {
tc, err := tests.NewTestCluster(s.ctx, 1)
defer tc.Destroy()
c.Assert(err, IsNil)
err = tc.RunInitialServers()
c.Assert(err, IsNil)
tc.WaitLeader()
leaderServer := tc.GetServer(tc.GetLeader())
grpcPDClient := testutil.MustNewGrpcClient(c, leaderServer.GetAddr())
clusterID := leaderServer.GetClusterID()
bootstrapCluster(c, clusterID, grpcPDClient, "127.0.0.1:0")
rc := leaderServer.GetRaftCluster()
c.Assert(rc, NotNil)
rc.SetStorage(core.NewStorage(kv.NewMemoryKV()))
store := newMetaStore(1, "127.0.1.1:0", "4.0.0", metapb.StoreState_Up)
_, err = putStore(c, grpcPDClient, clusterID, store)
c.Assert(err, IsNil)
r := &metapb.Region{
Id: 1,
RegionEpoch: &metapb.RegionEpoch{
ConfVer: 1,
Version: 1,
},
StartKey: []byte{byte(2)},
EndKey: []byte{byte(3)},
Peers: []*metapb.Peer{{Id: 11, StoreId: uint64(1)}},
}
region := core.NewRegionInfo(r, r.Peers[0], core.SetApproximateSize(10))

err = rc.HandleRegionHeartbeat(region)
c.Assert(err, IsNil)

// restart PD
// Here we use an empty storelimit to simulate the upgrade progress.
opt := rc.GetOpt()
scheduleCfg := opt.GetScheduleConfig()
scheduleCfg.StoreLimit = map[uint64]config.StoreLimitConfig{}
c.Assert(leaderServer.GetServer().SetScheduleConfig(*scheduleCfg), IsNil)
err = leaderServer.Stop()
c.Assert(err, IsNil)
err = leaderServer.Run()
c.Assert(err, IsNil)

oc := rc.GetOperatorController()
// only can add 5 remove peer operators on store 1
for i := uint64(1); i <= 5; i++ {
op := operator.NewOperator("test", "test", 1, &metapb.RegionEpoch{ConfVer: 1, Version: 1}, operator.OpRegion, operator.RemovePeer{FromStore: 1})
c.Assert(oc.AddOperator(op), IsTrue)
c.Assert(oc.RemoveOperator(op), IsTrue)
}
op := operator.NewOperator("test", "test", 1, &metapb.RegionEpoch{ConfVer: 1, Version: 1}, operator.OpRegion, operator.RemovePeer{FromStore: 1})
c.Assert(oc.AddOperator(op), IsFalse)
c.Assert(oc.RemoveOperator(op), IsFalse)
}

0 comments on commit 68a9215

Please sign in to comment.