diff --git a/server/cluster_info.go b/server/cluster_info.go index 61850210728..9cd8d593db5 100644 --- a/server/cluster_info.go +++ b/server/cluster_info.go @@ -453,7 +453,6 @@ func (c *clusterInfo) updateStoreStatusLocked(id uint64) { // handleRegionHeartbeat updates the region information. func (c *clusterInfo) handleRegionHeartbeat(region *core.RegionInfo) error { - region = region.Clone() c.RLock() origin := c.core.Regions.GetRegion(region.GetID()) isWriteUpdate, writeItem := c.core.CheckWriteStatus(region) diff --git a/server/cluster_test.go b/server/cluster_test.go index 1258aad9ce1..542d81884d9 100644 --- a/server/cluster_test.go +++ b/server/cluster_test.go @@ -15,12 +15,15 @@ package server import ( "context" + "fmt" "strings" + "sync" "github.com/coreos/etcd/clientv3" . "github.com/pingcap/check" "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/kvproto/pkg/pdpb" + "github.com/pingcap/pd/server/core" "google.golang.org/grpc" ) @@ -257,7 +260,6 @@ func (s *testClusterSuite) TestGetPutConfig(c *C) { // Get store. storeID := peer.GetStoreId() store := s.getStore(c, clusterID, storeID) - c.Assert(store.GetAddress(), Equals, storeAddr) // Update store. store.Address = "127.0.0.1:1" @@ -435,3 +437,86 @@ func (s *testClusterSuite) TestGetPDMembers(c *C) { // A more strict test can be found at api/member_test.go c.Assert(len(resp.GetMembers()), Not(Equals), 0) } + +func (s *testClusterSuite) TestConcurrentHandleRegion(c *C) { + storeAddrs := []string{"127.0.1.1:0", "127.0.1.1:1", "127.0.1.1:2"} + s.svr.bootstrapCluster(s.newBootstrapRequest(c, s.svr.clusterID, "127.0.0.1:0")) + s.svr.cluster.cachedCluster.Lock() + s.svr.cluster.cachedCluster.kv = core.NewKV(core.NewMemoryKV()) + s.svr.cluster.cachedCluster.Unlock() + var stores []*metapb.Store + for _, addr := range storeAddrs { + store := s.newStore(c, 0, addr) + stores = append(stores, store) + _, err := putStore(c, s.grpcPDClient, s.svr.clusterID, store) + c.Assert(err, IsNil) + } + + var wg sync.WaitGroup + // register store and bind stream + for i, store := range stores { + req := &pdpb.StoreHeartbeatRequest{ + Header: newRequestHeader(s.svr.clusterID), + Stats: &pdpb.StoreStats{ + StoreId: store.GetId(), + Capacity: 1000 * (1 << 20), + Available: 1000 * (1 << 20), + }, + } + _, err := s.svr.StoreHeartbeat(context.TODO(), req) + c.Assert(err, IsNil) + stream, err := s.grpcPDClient.RegionHeartbeat(context.Background()) + c.Assert(err, IsNil) + peer := &metapb.Peer{Id: s.allocID(c), StoreId: store.GetId()} + regionReq := &pdpb.RegionHeartbeatRequest{ + Header: newRequestHeader(s.svr.clusterID), + Region: &metapb.Region{ + Id: s.allocID(c), + Peers: []*metapb.Peer{peer}, + }, + Leader: peer, + } + err = stream.Send(regionReq) + c.Assert(err, IsNil) + // make sure the first store can receive one response + if i == 0 { + wg.Add(1) + } + go func(isReciver bool) { + if isReciver { + resp, err := stream.Recv() + c.Assert(err, IsNil) + c.Assert(resp.Header.GetError(), IsNil) + fmt.Println("get resp:", resp) + wg.Done() + } + for { + resp, err := stream.Recv() + c.Assert(err, IsNil) + c.Assert(resp.Header.GetError(), IsNil) + } + }(i == 0) + } + concurrent := 2000 + for i := 0; i < concurrent; i++ { + region := &metapb.Region{ + Id: s.allocID(c), + StartKey: []byte(fmt.Sprintf("%5d", i)), + EndKey: []byte(fmt.Sprintf("%5d", i+1)), + Peers: []*metapb.Peer{{Id: s.allocID(c), StoreId: stores[0].GetId()}}, + } + if i == 0 { + region.StartKey = []byte("") + } else if i == concurrent-1 { + region.EndKey = []byte("") + } + + wg.Add(1) + go func() { + defer wg.Done() + err := s.svr.cluster.HandleRegionHeartbeat(core.NewRegionInfo(region, region.Peers[0])) + c.Assert(err, IsNil) + }() + } + wg.Wait() +}