diff --git a/server/balancer_test.go b/server/balancer_test.go index 1cf786b27f76..efcc8e56585b 100644 --- a/server/balancer_test.go +++ b/server/balancer_test.go @@ -48,7 +48,7 @@ func (s *testBalancerSuite) newClusterInfo(c *C) *clusterInfo { Id: 0, MaxPeerCount: 3, } - clusterInfo.setMeta(meta) + clusterInfo.putMeta(meta) var ( id uint64 @@ -63,7 +63,7 @@ func (s *testBalancerSuite) newClusterInfo(c *C) *clusterInfo { addr := fmt.Sprintf("127.0.0.1:%d", i) store := s.newStore(c, id, addr) - clusterInfo.setStore(newStoreInfo(store)) + clusterInfo.putStore(newStoreInfo(store)) } // Add 1 peer, id will be 5. @@ -76,7 +76,7 @@ func (s *testBalancerSuite) newClusterInfo(c *C) *clusterInfo { c.Assert(err, IsNil) region := s.newRegion(c, id, []byte{}, []byte{}, []*metapb.Peer{peer}, nil) - clusterInfo.setRegion(newRegionInfo(region, peer)) + clusterInfo.putRegion(newRegionInfo(region, peer)) stores := clusterInfo.getStores() c.Assert(stores, HasLen, 4) @@ -101,7 +101,7 @@ func (s *testBalancerSuite) updateStore(c *C, clusterInfo *clusterInfo, storeID func (s *testBalancerSuite) updateStoreState(c *C, clusterInfo *clusterInfo, storeID uint64, state metapb.StoreState) { store := clusterInfo.getStore(storeID) store.State = state - clusterInfo.setStore(store) + clusterInfo.putStore(store) } func (s *testBalancerSuite) addRegionPeer(c *C, clusterInfo *clusterInfo, storeID uint64, region *regionInfo) { @@ -118,7 +118,7 @@ func (s *testBalancerSuite) addRegionPeer(c *C, clusterInfo *clusterInfo, storeI addRegionPeer(c, region.Region, peer) - clusterInfo.setRegion(region) + clusterInfo.putRegion(region) } func (s *testBalancerSuite) TestCapacityBalancer(c *C) { @@ -252,7 +252,7 @@ func (s *testBalancerSuite) TestCapacityBalancer(c *C) { Id: 0, MaxPeerCount: 1, } - clusterInfo.setMeta(meta) + clusterInfo.putMeta(meta) testCfg.MinCapacityUsedRatio = 0.3 testCfg.MaxCapacityUsedRatio = 0.9 @@ -264,7 +264,7 @@ func (s *testBalancerSuite) TestCapacityBalancer(c *C) { // Set region peers to one peer. peers := region.GetPeers() region.Peers = []*metapb.Peer{leader} - clusterInfo.setRegion(region) + clusterInfo.putRegion(region) cb = newCapacityBalancer(testCfg) _, bop, err = cb.Balance(clusterInfo) @@ -291,10 +291,10 @@ func (s *testBalancerSuite) TestCapacityBalancer(c *C) { c.Assert(bop, IsNil) // Reset cluster config and region peers. - clusterInfo.setMeta(oldMeta) + clusterInfo.putMeta(oldMeta) region.Peers = peers - clusterInfo.setRegion(region) + clusterInfo.putRegion(region) } // TODO: Refactor these tests, they are quite ugly now. @@ -468,7 +468,7 @@ func (s *testBalancerSuite) TestReplicaBalancerWithDownPeers(c *C) { // Now we have enough active replicas, we can remove the down peer in store 4. addRegionPeer(c, region.Region, op.ChangePeer.GetPeer()) - clusterInfo.setRegion(region) + clusterInfo.putRegion(region) rb = newReplicaBalancer(region, s.cfg) _, bop, err = rb.Balance(clusterInfo) diff --git a/server/cache.go b/server/cache.go index 9b1860f0eebc..6c4a8d673140 100644 --- a/server/cache.go +++ b/server/cache.go @@ -229,6 +229,7 @@ type clusterInfo struct { sync.RWMutex id IDAllocator + kv *kv meta *metapb.Cluster stores *storesInfo regions *regionsInfo @@ -242,8 +243,9 @@ func newClusterInfo(id IDAllocator) *clusterInfo { } } -func loadClusterInfo(id IDAllocator, kv *kv) (*clusterInfo, error) { +func newClusterInfoWithKV(id IDAllocator, kv *kv) (*clusterInfo, error) { c := newClusterInfo(id) + c.kv = kv c.meta = &metapb.Cluster{} ok, err := kv.loadMeta(c.meta) @@ -291,10 +293,20 @@ func (c *clusterInfo) getMeta() *metapb.Cluster { return proto.Clone(c.meta).(*metapb.Cluster) } -func (c *clusterInfo) setMeta(meta *metapb.Cluster) { +func (c *clusterInfo) putMeta(meta *metapb.Cluster) error { c.Lock() defer c.Unlock() + return c.innerPutMeta(proto.Clone(meta).(*metapb.Cluster)) +} + +func (c *clusterInfo) innerPutMeta(meta *metapb.Cluster) error { + if c.kv != nil { + if err := c.kv.saveMeta(meta); err != nil { + return errors.Trace(err) + } + } c.meta = meta + return nil } func (c *clusterInfo) getStore(storeID uint64) *storeInfo { @@ -303,10 +315,20 @@ func (c *clusterInfo) getStore(storeID uint64) *storeInfo { return c.stores.getStore(storeID) } -func (c *clusterInfo) setStore(store *storeInfo) { +func (c *clusterInfo) putStore(store *storeInfo) error { c.Lock() defer c.Unlock() - c.stores.setStore(store.clone()) + return c.innerPutStore(store.clone()) +} + +func (c *clusterInfo) innerPutStore(store *storeInfo) error { + if c.kv != nil { + if err := c.kv.saveStore(store.Store); err != nil { + return errors.Trace(err) + } + } + c.stores.setStore(store) + return nil } func (c *clusterInfo) getStores() []*storeInfo { @@ -339,10 +361,20 @@ func (c *clusterInfo) searchRegion(regionKey []byte) *regionInfo { return c.regions.searchRegion(regionKey) } -func (c *clusterInfo) setRegion(region *regionInfo) { +func (c *clusterInfo) putRegion(region *regionInfo) error { c.Lock() defer c.Unlock() - c.regions.setRegion(region.clone()) + return c.innerPutRegion(region.clone()) +} + +func (c *clusterInfo) innerPutRegion(region *regionInfo) error { + if c.kv != nil { + if err := c.kv.saveRegion(region.Region); err != nil { + return errors.Trace(err) + } + } + c.regions.setRegion(region) + return nil } func (c *clusterInfo) getRegions() []*regionInfo { @@ -388,7 +420,6 @@ func (c *clusterInfo) randFollowerRegion(storeID uint64) *regionInfo { } // handleStoreHeartbeat updates the store status. -// It returns an error if the store is not found. func (c *clusterInfo) handleStoreHeartbeat(stats *pdpb.StoreStats) error { c.Lock() defer c.Unlock() @@ -409,9 +440,7 @@ func (c *clusterInfo) handleStoreHeartbeat(stats *pdpb.StoreStats) error { } // handleRegionHeartbeat updates the region information. -// It returns true if the region meta is updated (or added). -// It returns an error if any error occurs. -func (c *clusterInfo) handleRegionHeartbeat(region *regionInfo) (bool, error) { +func (c *clusterInfo) handleRegionHeartbeat(region *regionInfo) error { c.Lock() defer c.Unlock() @@ -420,8 +449,7 @@ func (c *clusterInfo) handleRegionHeartbeat(region *regionInfo) (bool, error) { // Region does not exist, add it. if origin == nil { - c.regions.setRegion(region) - return true, nil + return c.innerPutRegion(region) } r := region.GetRegionEpoch() @@ -429,16 +457,15 @@ func (c *clusterInfo) handleRegionHeartbeat(region *regionInfo) (bool, error) { // Region meta is stale, return an error. if r.GetVersion() < o.GetVersion() || r.GetConfVer() < o.GetConfVer() { - return false, errors.Trace(errRegionIsStale(region.Region, origin.Region)) + return errors.Trace(errRegionIsStale(region.Region, origin.Region)) } - // Region meta is updated, update region and return true. + // Region meta is updated, update kv and cache. if r.GetVersion() > o.GetVersion() || r.GetConfVer() > o.GetConfVer() { - c.regions.setRegion(region) - return true, nil + return c.innerPutRegion(region) } - // Region meta is the same, update region and return false. + // Region meta is the same, update cache only. c.regions.setRegion(region) - return false, nil + return nil } diff --git a/server/cache_test.go b/server/cache_test.go index 58d5b707c206..45793c3a128d 100644 --- a/server/cache_test.go +++ b/server/cache_test.go @@ -182,6 +182,32 @@ var _ = Suite(&testClusterInfoSuite{}) type testClusterInfoSuite struct{} +func (s *testClusterInfoSuite) Test(c *C) { + var tests []func(*C, *clusterInfo) + tests = append(tests, s.testStoreHeartbeat) + tests = append(tests, s.testRegionHeartbeat) + tests = append(tests, s.testRegionSplitAndMerge) + + // Test without kv. + { + for _, test := range tests { + cluster := newClusterInfo(newMockIDAllocator()) + test(c, cluster) + } + } + + // Test with kv. + { + for _, test := range tests { + server, cleanup := mustRunTestServer(c) + defer cleanup() + cluster := newClusterInfo(server.idAlloc) + cluster.kv = server.kv + test(c, cluster) + } + } +} + func (s *testClusterInfoSuite) TestLoadClusterInfo(c *C) { server, cleanup := mustRunTestServer(c) defer cleanup() @@ -189,7 +215,7 @@ func (s *testClusterInfoSuite) TestLoadClusterInfo(c *C) { kv := server.kv // Cluster is not bootstrapped. - cluster, err := loadClusterInfo(server.idAlloc, kv) + cluster, err := newClusterInfoWithKV(server.idAlloc, kv) c.Assert(err, IsNil) c.Assert(cluster, IsNil) @@ -200,7 +226,7 @@ func (s *testClusterInfoSuite) TestLoadClusterInfo(c *C) { stores := mustSaveStores(c, kv, n) regions := mustSaveRegions(c, kv, n) - cluster, err = loadClusterInfo(server.idAlloc, kv) + cluster, err = newClusterInfoWithKV(server.idAlloc, kv) c.Assert(err, IsNil) c.Assert(cluster, NotNil) @@ -216,14 +242,13 @@ func (s *testClusterInfoSuite) TestLoadClusterInfo(c *C) { } } -func (s *testClusterInfoSuite) TestStoreHeartbeat(c *C) { +func (s *testClusterInfoSuite) testStoreHeartbeat(c *C, cache *clusterInfo) { n, np := uint64(3), uint64(3) - cache := newClusterInfo(newMockIDAllocator()) stores := newTestStores(n) regions := newTestRegions(n, np) for _, region := range regions { - cache.setRegion(region) + c.Assert(cache.putRegion(region), IsNil) } c.Assert(cache.getRegionCount(), Equals, int(n)) @@ -231,7 +256,7 @@ func (s *testClusterInfoSuite) TestStoreHeartbeat(c *C) { storeStats := &pdpb.StoreStats{StoreId: store.GetId()} c.Assert(cache.handleStoreHeartbeat(storeStats), NotNil) - cache.setStore(store) + c.Assert(cache.putStore(store), IsNil) c.Assert(cache.getStoreCount(), Equals, int(i+1)) stats := store.stats @@ -251,24 +276,30 @@ func (s *testClusterInfoSuite) TestStoreHeartbeat(c *C) { } c.Assert(cache.getStoreCount(), Equals, int(n)) + + // Test with kv. + if kv := cache.kv; kv != nil { + for _, store := range stores { + tmp := &metapb.Store{} + ok, err := kv.loadStore(store.GetId(), tmp) + c.Assert(ok, IsTrue) + c.Assert(err, IsNil) + c.Assert(tmp, DeepEquals, store.Store) + } + } } -func (s *testClusterInfoSuite) TestRegionHeartbeat(c *C) { +func (s *testClusterInfoSuite) testRegionHeartbeat(c *C, cache *clusterInfo) { n, np := uint64(3), uint64(3) - cache := newClusterInfo(newMockIDAllocator()) regions := newTestRegions(n, np) for i, region := range regions { // region does not exist. - updated, err := cache.handleRegionHeartbeat(region) - c.Assert(updated, IsTrue) - c.Assert(err, IsNil) + c.Assert(cache.handleRegionHeartbeat(region), IsNil) checkRegions(c, cache.regions, regions[0:i+1]) // region is the same, not updated. - updated, err = cache.handleRegionHeartbeat(region) - c.Assert(updated, IsFalse) - c.Assert(err, IsNil) + c.Assert(cache.handleRegionHeartbeat(region), IsNil) checkRegions(c, cache.regions, regions[0:i+1]) epoch := region.clone().GetRegionEpoch() @@ -277,9 +308,7 @@ func (s *testClusterInfoSuite) TestRegionHeartbeat(c *C) { region.RegionEpoch = &metapb.RegionEpoch{ Version: epoch.GetVersion() + 1, } - updated, err = cache.handleRegionHeartbeat(region) - c.Assert(updated, IsTrue) - c.Assert(err, IsNil) + c.Assert(cache.handleRegionHeartbeat(region), IsNil) checkRegions(c, cache.regions, regions[0:i+1]) // region is stale (Version). @@ -287,9 +316,7 @@ func (s *testClusterInfoSuite) TestRegionHeartbeat(c *C) { stale.RegionEpoch = &metapb.RegionEpoch{ ConfVer: epoch.GetConfVer() + 1, } - updated, err = cache.handleRegionHeartbeat(stale) - c.Assert(updated, IsFalse) - c.Assert(err, NotNil) + c.Assert(cache.handleRegionHeartbeat(stale), NotNil) checkRegions(c, cache.regions, regions[0:i+1]) // region is updated. @@ -297,9 +324,7 @@ func (s *testClusterInfoSuite) TestRegionHeartbeat(c *C) { Version: epoch.GetVersion() + 1, ConfVer: epoch.GetConfVer() + 1, } - updated, err = cache.handleRegionHeartbeat(region) - c.Assert(updated, IsTrue) - c.Assert(err, IsNil) + c.Assert(cache.handleRegionHeartbeat(region), IsNil) checkRegions(c, cache.regions, regions[0:i+1]) // region is stale (ConfVer). @@ -307,9 +332,7 @@ func (s *testClusterInfoSuite) TestRegionHeartbeat(c *C) { stale.RegionEpoch = &metapb.RegionEpoch{ Version: epoch.GetVersion() + 1, } - updated, err = cache.handleRegionHeartbeat(stale) - c.Assert(updated, IsFalse) - c.Assert(err, NotNil) + c.Assert(cache.handleRegionHeartbeat(stale), NotNil) checkRegions(c, cache.regions, regions[0:i+1]) } @@ -329,6 +352,17 @@ func (s *testClusterInfoSuite) TestRegionHeartbeat(c *C) { for _, region := range cache.getMetaRegions() { c.Assert(region, DeepEquals, regions[region.GetId()].Region) } + + // Test with kv. + if kv := cache.kv; kv != nil { + for _, region := range regions { + tmp := &metapb.Region{} + ok, err := kv.loadRegion(region.GetId(), tmp) + c.Assert(ok, IsTrue) + c.Assert(err, IsNil) + c.Assert(tmp, DeepEquals, region.Region) + } + } } func heartbeatRegions(c *C, cache *clusterInfo, regions []*metapb.Region) { @@ -336,9 +370,7 @@ func heartbeatRegions(c *C, cache *clusterInfo, regions []*metapb.Region) { for _, region := range regions { r := newRegionInfo(region, nil) - updated, err := cache.handleRegionHeartbeat(r) - c.Assert(updated, IsTrue) - c.Assert(err, IsNil) + c.Assert(cache.handleRegionHeartbeat(r), IsNil) checkRegion(c, cache.getRegion(r.GetId()), r) checkRegion(c, cache.searchRegion(r.StartKey), r) @@ -365,8 +397,7 @@ func heartbeatRegions(c *C, cache *clusterInfo, regions []*metapb.Region) { } } -func (s *testClusterInfoSuite) TestRegionHeartbeatSplitAndMerge(c *C) { - cache := newClusterInfo(newMockIDAllocator()) +func (s *testClusterInfoSuite) testRegionSplitAndMerge(c *C, cache *clusterInfo) { regions := []*metapb.Region{ { Id: 1, diff --git a/server/cluster.go b/server/cluster.go index 8ec3771b3700..debf68e62716 100644 --- a/server/cluster.go +++ b/server/cluster.go @@ -80,7 +80,7 @@ func (c *RaftCluster) start() error { return nil } - cluster, err := loadClusterInfo(c.s.idAlloc, c.s.kv) + cluster, err := newClusterInfoWithKV(c.s.idAlloc, c.s.kv) if err != nil { return errors.Trace(err) } @@ -316,19 +316,21 @@ func (c *RaftCluster) putStore(store *metapb.Store) error { return errors.Errorf("invalid put store %v", store) } + cluster := c.cachedCluster + // There are 3 cases here: // Case 1: store id exists with the same address - do nothing; // Case 2: store id exists with different address - update address; - if s := c.cachedCluster.getStore(store.GetId()); s != nil { + if s := cluster.getStore(store.GetId()); s != nil { if s.GetAddress() == store.GetAddress() { return nil } s.Address = store.Address - return c.saveStore(s.Store) + return cluster.putStore(s) } // Case 3: store id does not exist, check duplicated address. - for _, s := range c.cachedCluster.getStores() { + for _, s := range cluster.getStores() { // It's OK to start a new store on the same address if the old store has been removed. if s.isTombstone() { continue @@ -337,27 +339,7 @@ func (c *RaftCluster) putStore(store *metapb.Store) error { return errors.Errorf("duplicated store address: %v, already registered by %v", store, s.Store) } } - return c.saveStore(store) -} - -func (c *RaftCluster) saveStore(store *metapb.Store) error { - storeValue, err := store.Marshal() - if err != nil { - return errors.Trace(err) - } - - storePath := makeStoreKey(c.clusterRoot, store.GetId()) - - resp, err := c.s.leaderTxn().Then(clientv3.OpPut(storePath, string(storeValue))).Commit() - if err != nil { - return errors.Trace(err) - } - if !resp.Succeeded { - return errors.Errorf("save store %v fail", store) - } - - c.cachedCluster.setStore(newStoreInfo(store)) - return nil + return cluster.putStore(newStoreInfo(store)) } // RemoveStore marks a store as offline in cluster. @@ -366,22 +348,24 @@ func (c *RaftCluster) RemoveStore(storeID uint64) error { c.Lock() defer c.Unlock() - store, _, err := c.GetStore(storeID) - if err != nil { - return errors.Trace(err) + cluster := c.cachedCluster + + store := cluster.getStore(storeID) + if store == nil { + return errors.Trace(errStoreNotFound(storeID)) } // Remove an offline store should be OK, nothing to do. - if store.State == metapb.StoreState_Offline { + if store.isOffline() { return nil } - if store.State == metapb.StoreState_Tombstone { + if store.isTombstone() { return errors.New("store has been removed") } store.State = metapb.StoreState_Offline - return c.saveStore(store) + return cluster.putStore(store) } // BuryStore marks a store as tombstone in cluster. @@ -392,17 +376,19 @@ func (c *RaftCluster) BuryStore(storeID uint64, force bool) error { c.Lock() defer c.Unlock() - store, _, err := c.GetStore(storeID) - if err != nil { - return errors.Trace(err) + cluster := c.cachedCluster + + store := cluster.getStore(storeID) + if store == nil { + return errors.Trace(errStoreNotFound(storeID)) } // Bury a tombstone store should be OK, nothing to do. - if store.State == metapb.StoreState_Tombstone { + if store.isTombstone() { return nil } - if store.State == metapb.StoreState_Up { + if store.isUp() { if !force { return errors.New("store is still up, please remove store gracefully") } @@ -410,7 +396,7 @@ func (c *RaftCluster) BuryStore(storeID uint64, force bool) error { } store.State = metapb.StoreState_Tombstone - return c.saveStore(store) + return cluster.putStore(store) } func (c *RaftCluster) checkStores() { @@ -524,23 +510,7 @@ func (c *RaftCluster) putConfig(meta *metapb.Cluster) error { if meta.GetId() != c.clusterID { return errors.Errorf("invalid cluster %v, mismatch cluster id %d", meta, c.clusterID) } - - metaValue, err := meta.Marshal() - if err != nil { - return errors.Trace(err) - } - - resp, err := c.s.leaderTxn().Then(clientv3.OpPut(c.clusterRoot, string(metaValue))).Commit() - if err != nil { - return errors.Trace(err) - } - if !resp.Succeeded { - return errors.Errorf("put cluster meta %v error", meta) - } - - c.cachedCluster.setMeta(meta) - - return nil + return c.cachedCluster.putMeta(meta) } // NewAddPeerOperator creates an operator to add a peer to the region. diff --git a/server/cluster_test.go b/server/cluster_test.go index 6c5e1c351419..e02ccf9ee605 100644 --- a/server/cluster_test.go +++ b/server/cluster_test.go @@ -358,12 +358,12 @@ func (s *testClusterSuite) testPutStore(c *C, conn net.Conn, clusterID uint64, s } func (s *testClusterSuite) resetStoreState(c *C, storeID uint64, state metapb.StoreState) { - cluster := s.svr.GetRaftCluster() + cluster := s.svr.GetRaftCluster().cachedCluster c.Assert(cluster, NotNil) - store := cluster.cachedCluster.getStore(storeID) + store := cluster.getStore(storeID) c.Assert(store, NotNil) store.State = state - cluster.cachedCluster.setStore(store) + cluster.putStore(store) } func (s *testClusterSuite) testRemoveStore(c *C, conn net.Conn, clusterID uint64, store *metapb.Store) { diff --git a/server/command.go b/server/command.go index f7fed8d544e1..7193f9ac93bf 100644 --- a/server/command.go +++ b/server/command.go @@ -14,7 +14,6 @@ package server import ( - "github.com/coreos/etcd/clientv3" "github.com/golang/protobuf/proto" "github.com/juju/errors" "github.com/ngaut/log" @@ -184,7 +183,7 @@ func (c *conn) handleRegionHeartbeat(req *pdpb.Request) (*pdpb.Response, error) return nil, errors.Errorf("invalid request leader, %v", request) } - updated, err := cluster.cachedCluster.handleRegionHeartbeat(region) + err = cluster.cachedCluster.handleRegionHeartbeat(region) if err != nil { return nil, errors.Trace(err) } @@ -194,23 +193,6 @@ func (c *conn) handleRegionHeartbeat(req *pdpb.Request) (*pdpb.Response, error) return nil, errors.Trace(err) } - if updated { - regionValue, err := region.Marshal() - if err != nil { - return nil, errors.Trace(err) - } - regionPath := makeRegionKey(cluster.clusterRoot, region.GetId()) - - op := clientv3.OpPut(regionPath, string(regionValue)) - resp, err := c.s.leaderTxn().Then(op).Commit() - if err != nil { - return nil, errors.Trace(err) - } - if !resp.Succeeded { - return nil, errors.New("handle region heartbeat failed") - } - } - return &pdpb.Response{ RegionHeartbeat: res, }, nil