Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tools/simulator: reduce range times #8318

Merged
merged 2 commits into from
Jun 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion pkg/core/region.go
Original file line number Diff line number Diff line change
Expand Up @@ -1477,7 +1477,7 @@ const (
PendingPeerInSubTree SubTreeRegionType = "pending"
)

// GetStoreRegions gets all RegionInfo with a given storeID
// GetStoreRegionsByTypeInSubTree gets all RegionInfo with a given storeID
func (r *RegionsInfo) GetStoreRegionsByTypeInSubTree(storeID uint64, typ SubTreeRegionType) ([]*RegionInfo, error) {
r.st.RLock()
var regions []*RegionInfo
Expand Down Expand Up @@ -2210,3 +2210,11 @@ func NewTestRegionInfo(regionID, storeID uint64, start, end []byte, opts ...Regi
}
return NewRegionInfo(metaRegion, leader, opts...)
}

// TraverseRegions executes a function on all regions.
// ONLY for simulator now and function need to be self-locked.
func (r *RegionsInfo) TraverseRegions(lockedFunc func(*RegionInfo)) {
for _, item := range r.regions {
lockedFunc(item.RegionInfo)
}
}
1 change: 1 addition & 0 deletions tools/pd-simulator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,7 @@ EXIT:

if simulator.PDHTTPClient != nil {
simulator.PDHTTPClient.Close()
simulator.SD.Close()
}
if simResult != "OK" {
os.Exit(1)
Expand Down
10 changes: 5 additions & 5 deletions tools/pd-simulator/simulator/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ var (
// errFailInitClusterID is returned when failed to load clusterID from all supplied PD addresses.
errFailInitClusterID = errors.New("[pd] failed to get cluster id")
PDHTTPClient pdHttp.Client
sd pd.ServiceDiscovery
SD pd.ServiceDiscovery
ClusterID uint64
)

Expand Down Expand Up @@ -167,9 +167,9 @@ func (c *client) HeartbeatStreamLoop() {

// update connection to recreate heartbeat stream
for i := 0; i < retryTimes; i++ {
sd.ScheduleCheckMemberChanged()
SD.ScheduleCheckMemberChanged()
time.Sleep(leaderChangedWaitTime)
if client := sd.GetServiceClient(); client != nil {
if client := SD.GetServiceClient(); client != nil {
_, conn, err := getLeaderURL(ctx, client.GetClientConn())
if err != nil {
simutil.Logger.Error("[HeartbeatStreamLoop] failed to get leader URL", zap.Error(err))
Expand Down Expand Up @@ -351,9 +351,9 @@ func (rc *RetryClient) requestWithRetry(f func() (any, error)) (any, error) {
}
// retry to get leader URL
for i := 0; i < rc.retryCount; i++ {
sd.ScheduleCheckMemberChanged()
SD.ScheduleCheckMemberChanged()
time.Sleep(100 * time.Millisecond)
if client := sd.GetServiceClient(); client != nil {
if client := SD.GetServiceClient(); client != nil {
_, conn, err := getLeaderURL(context.Background(), client.GetClientConn())
if err != nil {
simutil.Logger.Error("[retry] failed to get leader URL", zap.Error(err))
Expand Down
6 changes: 3 additions & 3 deletions tools/pd-simulator/simulator/drive.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,12 +150,12 @@ func (d *Driver) allocID() error {
func (d *Driver) updateNodesClient() error {
urls := strings.Split(d.pdAddr, ",")
ctx, cancel := context.WithCancel(context.Background())
sd = pd.NewDefaultPDServiceDiscovery(ctx, cancel, urls, nil)
if err := sd.Init(); err != nil {
SD = pd.NewDefaultPDServiceDiscovery(ctx, cancel, urls, nil)
if err := SD.Init(); err != nil {
return err
}
// Init PD HTTP client.
PDHTTPClient = pdHttp.NewClientWithServiceDiscovery("pd-simulator", sd)
PDHTTPClient = pdHttp.NewClientWithServiceDiscovery("pd-simulator", SD)

for _, node := range d.conn.Nodes {
node.client = NewRetryClient(node)
Expand Down
9 changes: 4 additions & 5 deletions tools/pd-simulator/simulator/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,8 +228,8 @@ func (e *DownNode) Run(raft *RaftEngine, _ int64) bool {
node = nodes[uint64(e.ID)]
}
if node == nil {
simutil.Logger.Error("node is not existed", zap.Uint64("node-id", node.Id))
return false
simutil.Logger.Error("node is not existed")
return true
}
delete(raft.conn.Nodes, node.Id)
// delete store
Expand All @@ -240,8 +240,7 @@ func (e *DownNode) Run(raft *RaftEngine, _ int64) bool {
}
node.Stop()

regions := raft.GetRegions()
for _, region := range regions {
raft.TraverseRegions(func(region *core.RegionInfo) {
storeIDs := region.GetStoreIDs()
if _, ok := storeIDs[node.Id]; ok {
downPeer := &pdpb.PeerStats{
Expand All @@ -251,6 +250,6 @@ func (e *DownNode) Run(raft *RaftEngine, _ int64) bool {
region = region.Clone(core.WithDownPeers(append(region.GetDownPeers(), downPeer)))
raft.SetRegion(region)
}
}
})
return true
}
6 changes: 3 additions & 3 deletions tools/pd-simulator/simulator/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/docker/go-units"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/tikv/pd/pkg/core"
"github.com/tikv/pd/pkg/ratelimit"
"github.com/tikv/pd/pkg/utils/syncutil"
"github.com/tikv/pd/tools/pd-simulator/simulator/cases"
Expand Down Expand Up @@ -204,8 +205,7 @@ func (n *Node) regionHeartBeat() {
if n.GetNodeState() != metapb.NodeState_Preparing && n.GetNodeState() != metapb.NodeState_Serving {
return
}
regions := n.raftEngine.GetRegions()
for _, region := range regions {
n.raftEngine.TraverseRegions(func(region *core.RegionInfo) {
if region.GetLeader() != nil && region.GetLeader().GetStoreId() == n.Id {
ctx, cancel := context.WithTimeout(n.ctx, pdTimeout)
err := n.client.RegionHeartbeat(ctx, region)
Expand All @@ -217,7 +217,7 @@ func (n *Node) regionHeartBeat() {
}
cancel()
}
}
})
}

func (n *Node) reportRegionChange() {
Expand Down
13 changes: 5 additions & 8 deletions tools/pd-simulator/simulator/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,11 +91,10 @@ func NewRaftEngine(conf *cases.Case, conn *Connection, storeConfig *config.SimCo
}

func (r *RaftEngine) stepRegions() {
regions := r.GetRegions()
for _, region := range regions {
r.TraverseRegions(func(region *core.RegionInfo) {
r.stepLeader(region)
r.stepSplit(region)
}
})
}

func (r *RaftEngine) stepLeader(region *core.RegionInfo) {
Expand Down Expand Up @@ -265,11 +264,9 @@ func (r *RaftEngine) ResetRegionChange(storeID uint64, regionID uint64) {
}
}

// GetRegions gets all RegionInfo from regionMap
func (r *RaftEngine) GetRegions() []*core.RegionInfo {
r.RLock()
defer r.RUnlock()
return r.regionsInfo.GetRegions()
// TraverseRegions executes a function on all regions, and function need to be self-locked.
func (r *RaftEngine) TraverseRegions(lockedFunc func(*core.RegionInfo)) {
r.regionsInfo.TraverseRegions(lockedFunc)
}

// SetRegion sets the RegionInfo with regionID
Expand Down
Loading