Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

server: remove extra region clone when handle region heartbeat #1195

Merged
merged 13 commits into from
Sep 6, 2018
1 change: 0 additions & 1 deletion server/cluster_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -446,7 +446,6 @@ func (c *clusterInfo) updateStoreStatusLocked(id uint64) {

// handleRegionHeartbeat updates the region information.
func (c *clusterInfo) handleRegionHeartbeat(region *core.RegionInfo) error {
region = region.Clone()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The region is used after calling this function. Are you sure that it is safe to not clone? See: https://github.com/pingcap/pd/blob/9b0763fb0474c657c1abbef8238871f3d1284817/server/cluster_worker.go#L28-L41

Copy link
Contributor Author

@nolouch nolouch Aug 16, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, all function only read it. More reassuring, I clone the region which has operator now.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It does not make sense to clone the region. Cloning the region reads the region too.

I'm not so sure that the region is inmutable. I think you can propose another PR to change all fields of core.RegionInfo to unexported fields and add read only methods to access them. After that we can be sure that the RegionInfo is inmutable and discard the Clone method of it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

got it.

c.RLock()
origin := c.core.Regions.GetRegion(region.GetID())
isWriteUpdate, writeItem := c.core.CheckWriteStatus(region)
Expand Down
87 changes: 86 additions & 1 deletion server/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,15 @@ package server

import (
"context"
"fmt"
"strings"
"sync"

"github.com/coreos/etcd/clientv3"
. "github.com/pingcap/check"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/pingcap/pd/server/core"
"google.golang.org/grpc"
)

Expand Down Expand Up @@ -257,7 +260,6 @@ func (s *testClusterSuite) TestGetPutConfig(c *C) {
// Get store.
storeID := peer.GetStoreId()
store := s.getStore(c, clusterID, storeID)
c.Assert(store.GetAddress(), Equals, storeAddr)

// Update store.
store.Address = "127.0.0.1:1"
Expand Down Expand Up @@ -436,3 +438,86 @@ func (s *testClusterSuite) TestGetPDMembers(c *C) {
// A more strict test can be found at api/member_test.go
c.Assert(len(resp.GetMembers()), Not(Equals), 0)
}

func (s *testClusterSuite) TestConcurrentHandleRegion(c *C) {
storeAddrs := []string{"127.0.1.1:0", "127.0.1.1:1", "127.0.1.1:2"}
s.svr.bootstrapCluster(s.newBootstrapRequest(c, s.svr.clusterID, "127.0.0.1:0"))
s.svr.cluster.cachedCluster.Lock()
s.svr.cluster.cachedCluster.kv = core.NewKV(core.NewMemoryKV())
s.svr.cluster.cachedCluster.Unlock()
var stores []*metapb.Store
for _, addr := range storeAddrs {
store := s.newStore(c, 0, addr)
stores = append(stores, store)
_, err := putStore(c, s.grpcPDClient, s.svr.clusterID, store)
c.Assert(err, IsNil)
}

var wg sync.WaitGroup
// register store and bind stream
for i, store := range stores {
req := &pdpb.StoreHeartbeatRequest{
Header: newRequestHeader(s.svr.clusterID),
Stats: &pdpb.StoreStats{
StoreId: store.GetId(),
Capacity: 1000 * (1 << 20),
Available: 1000 * (1 << 20),
},
}
_, err := s.svr.StoreHeartbeat(context.TODO(), req)
c.Assert(err, IsNil)
stream, err := s.grpcPDClient.RegionHeartbeat(context.Background())
c.Assert(err, IsNil)
peer := &metapb.Peer{Id: s.allocID(c), StoreId: store.GetId()}
regionReq := &pdpb.RegionHeartbeatRequest{
Header: newRequestHeader(s.svr.clusterID),
Region: &metapb.Region{
Id: s.allocID(c),
Peers: []*metapb.Peer{peer},
},
Leader: peer,
}
err = stream.Send(regionReq)
c.Assert(err, IsNil)
// make sure the first store can receive one response
if i == 0 {
wg.Add(1)
}
go func(isReciver bool) {
if isReciver {
resp, err := stream.Recv()
c.Assert(err, IsNil)
c.Assert(resp.Header.GetError(), IsNil)
fmt.Println("get resp:", resp)
wg.Done()
}
for {
resp, err := stream.Recv()
c.Assert(err, IsNil)
c.Assert(resp.Header.GetError(), IsNil)
}
}(i == 0)
}
concurrent := 2000
for i := 0; i < concurrent; i++ {
region := &metapb.Region{
Id: s.allocID(c),
StartKey: []byte(fmt.Sprintf("%5d", i)),
EndKey: []byte(fmt.Sprintf("%5d", i+1)),
Peers: []*metapb.Peer{{Id: s.allocID(c), StoreId: stores[0].GetId()}},
}
if i == 0 {
region.StartKey = []byte("")
} else if i == concurrent-1 {
region.EndKey = []byte("")
}

wg.Add(1)
go func() {
defer wg.Done()
err := s.svr.cluster.HandleRegionHeartbeat(core.NewRegionInfo(region, region.Peers[0]))
c.Assert(err, IsNil)
}()
}
wg.Wait()
}