Skip to content

Commit

Permalink
tests: add store limit test (#7214)
Browse files Browse the repository at this point in the history
ref #5839

Signed-off-by: Ryan Leung <rleungx@gmail.com>
  • Loading branch information
rleungx authored Oct 16, 2023
1 parent 4176c1d commit 8b2896b
Showing 1 changed file with 121 additions and 0 deletions.
121 changes: 121 additions & 0 deletions tests/integrations/mcs/scheduling/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@ import (
"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
"github.com/tikv/pd/pkg/core/storelimit"
mcs "github.com/tikv/pd/pkg/mcs/utils"
"github.com/tikv/pd/pkg/schedule/operator"
"github.com/tikv/pd/pkg/schedule/schedulers"
"github.com/tikv/pd/pkg/utils/testutil"
"github.com/tikv/pd/server"
Expand Down Expand Up @@ -377,3 +379,122 @@ func (suite *serverTestSuite) TestForwardRegionHeartbeat() {
reflect.DeepEqual(region.GetDownPeers(), downPeers) && reflect.DeepEqual(region.GetPendingPeers(), pendingPeers)
})
}

func (suite *serverTestSuite) TestStoreLimit() {
re := suite.Require()
tc, err := tests.NewTestSchedulingCluster(suite.ctx, 1, suite.backendEndpoints)
re.NoError(err)
defer tc.Destroy()
tc.WaitForPrimaryServing(re)

oc := tc.GetPrimaryServer().GetCluster().GetCoordinator().GetOperatorController()
leaderServer := suite.pdLeader.GetServer()
conf := leaderServer.GetReplicationConfig().Clone()
conf.MaxReplicas = 1
leaderServer.SetReplicationConfig(*conf)
grpcPDClient := testutil.MustNewGrpcClient(re, suite.pdLeader.GetServer().GetAddr())
for i := uint64(1); i <= 2; i++ {
resp, err := grpcPDClient.PutStore(
context.Background(), &pdpb.PutStoreRequest{
Header: &pdpb.RequestHeader{ClusterId: suite.pdLeader.GetClusterID()},
Store: &metapb.Store{
Id: i,
Address: fmt.Sprintf("mock://%d", i),
State: metapb.StoreState_Up,
Version: "7.0.0",
},
},
)
re.NoError(err)
re.Empty(resp.GetHeader().GetError())
}

stream, err := grpcPDClient.RegionHeartbeat(suite.ctx)
re.NoError(err)
for i := uint64(2); i <= 10; i++ {
peers := []*metapb.Peer{{Id: i, StoreId: 1}}
regionReq := &pdpb.RegionHeartbeatRequest{
Header: testutil.NewRequestHeader(suite.pdLeader.GetClusterID()),
Region: &metapb.Region{
Id: i,
Peers: peers,
StartKey: []byte(fmt.Sprintf("t%d", i)),
EndKey: []byte(fmt.Sprintf("t%d", i+1)),
},
Leader: peers[0],
ApproximateSize: 10 * units.MiB,
}
err = stream.Send(regionReq)
re.NoError(err)
}

leaderServer.GetRaftCluster().SetStoreLimit(1, storelimit.AddPeer, 60)
leaderServer.GetRaftCluster().SetStoreLimit(1, storelimit.RemovePeer, 60)
leaderServer.GetRaftCluster().SetStoreLimit(2, storelimit.AddPeer, 60)
leaderServer.GetRaftCluster().SetStoreLimit(2, storelimit.RemovePeer, 60)
// There is a time window between setting store limit in API service side and capturing the change in scheduling service.
waitSyncFinish(re, tc, storelimit.AddPeer, 60)
for i := uint64(1); i <= 5; i++ {
op := operator.NewTestOperator(2, &metapb.RegionEpoch{}, operator.OpRegion, operator.AddPeer{ToStore: 2, PeerID: 100})
checkOperatorSuccess(re, oc, op)
}
op := operator.NewTestOperator(2, &metapb.RegionEpoch{}, operator.OpRegion, operator.AddPeer{ToStore: 2, PeerID: 100})
checkOperatorFail(re, oc, op)

leaderServer.GetRaftCluster().SetStoreLimit(2, storelimit.AddPeer, 120)
waitSyncFinish(re, tc, storelimit.AddPeer, 120)
for i := uint64(1); i <= 10; i++ {
op = operator.NewTestOperator(2, &metapb.RegionEpoch{}, operator.OpRegion, operator.AddPeer{ToStore: 2, PeerID: 100})
checkOperatorSuccess(re, oc, op)
}
leaderServer.GetRaftCluster().SetAllStoresLimit(storelimit.AddPeer, 60)
waitSyncFinish(re, tc, storelimit.AddPeer, 60)
for i := uint64(1); i <= 5; i++ {
op = operator.NewTestOperator(2, &metapb.RegionEpoch{}, operator.OpRegion, operator.AddPeer{ToStore: 2, PeerID: 100})
checkOperatorSuccess(re, oc, op)
}
op = operator.NewTestOperator(2, &metapb.RegionEpoch{}, operator.OpRegion, operator.AddPeer{ToStore: 2, PeerID: 100})
checkOperatorFail(re, oc, op)

leaderServer.GetRaftCluster().SetStoreLimit(2, storelimit.RemovePeer, 60)
waitSyncFinish(re, tc, storelimit.RemovePeer, 60)
for i := uint64(1); i <= 5; i++ {
op := operator.NewTestOperator(2, &metapb.RegionEpoch{}, operator.OpRegion, operator.RemovePeer{FromStore: 2})
checkOperatorSuccess(re, oc, op)
}
op = operator.NewTestOperator(2, &metapb.RegionEpoch{}, operator.OpRegion, operator.RemovePeer{FromStore: 2})
checkOperatorFail(re, oc, op)

leaderServer.GetRaftCluster().SetStoreLimit(2, storelimit.RemovePeer, 120)
waitSyncFinish(re, tc, storelimit.RemovePeer, 120)
for i := uint64(1); i <= 10; i++ {
op = operator.NewTestOperator(2, &metapb.RegionEpoch{}, operator.OpRegion, operator.RemovePeer{FromStore: 2})
checkOperatorSuccess(re, oc, op)
}
leaderServer.GetRaftCluster().SetAllStoresLimit(storelimit.RemovePeer, 60)
waitSyncFinish(re, tc, storelimit.RemovePeer, 60)
for i := uint64(1); i <= 5; i++ {
op = operator.NewTestOperator(2, &metapb.RegionEpoch{}, operator.OpRegion, operator.RemovePeer{FromStore: 2})
checkOperatorSuccess(re, oc, op)
}
op = operator.NewTestOperator(2, &metapb.RegionEpoch{}, operator.OpRegion, operator.RemovePeer{FromStore: 2})
checkOperatorFail(re, oc, op)
}

func checkOperatorSuccess(re *require.Assertions, oc *operator.Controller, op *operator.Operator) {
re.True(oc.AddOperator(op))
re.True(oc.RemoveOperator(op))
re.True(op.IsEnd())
re.Equal(op, oc.GetOperatorStatus(op.RegionID()).Operator)
}

func checkOperatorFail(re *require.Assertions, oc *operator.Controller, op *operator.Operator) {
re.False(oc.AddOperator(op))
re.False(oc.RemoveOperator(op))
}

func waitSyncFinish(re *require.Assertions, tc *tests.TestSchedulingCluster, typ storelimit.Type, expectedLimit float64) {
testutil.Eventually(re, func() bool {
return tc.GetPrimaryServer().GetPersistConfig().GetStoreLimitByType(2, typ) == expectedLimit
})
}

0 comments on commit 8b2896b

Please sign in to comment.