Skip to content

Commit

Permalink
server/cache: add kv to cluster
Browse files Browse the repository at this point in the history
It's more safe to update cache and kv consistenly inside cluster.
  • Loading branch information
huachaohuang committed Nov 8, 2016
1 parent 9b507f1 commit 8d9b537
Show file tree
Hide file tree
Showing 6 changed files with 145 additions and 135 deletions.
20 changes: 10 additions & 10 deletions server/balancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func (s *testBalancerSuite) newClusterInfo(c *C) *clusterInfo {
Id: 0,
MaxPeerCount: 3,
}
clusterInfo.setMeta(meta)
clusterInfo.putMeta(meta)

var (
id uint64
Expand All @@ -63,7 +63,7 @@ func (s *testBalancerSuite) newClusterInfo(c *C) *clusterInfo {

addr := fmt.Sprintf("127.0.0.1:%d", i)
store := s.newStore(c, id, addr)
clusterInfo.setStore(newStoreInfo(store))
clusterInfo.putStore(newStoreInfo(store))
}

// Add 1 peer, id will be 5.
Expand All @@ -76,7 +76,7 @@ func (s *testBalancerSuite) newClusterInfo(c *C) *clusterInfo {
c.Assert(err, IsNil)

region := s.newRegion(c, id, []byte{}, []byte{}, []*metapb.Peer{peer}, nil)
clusterInfo.setRegion(newRegionInfo(region, peer))
clusterInfo.putRegion(newRegionInfo(region, peer))

stores := clusterInfo.getStores()
c.Assert(stores, HasLen, 4)
Expand All @@ -101,7 +101,7 @@ func (s *testBalancerSuite) updateStore(c *C, clusterInfo *clusterInfo, storeID
func (s *testBalancerSuite) updateStoreState(c *C, clusterInfo *clusterInfo, storeID uint64, state metapb.StoreState) {
store := clusterInfo.getStore(storeID)
store.State = state
clusterInfo.setStore(store)
clusterInfo.putStore(store)
}

func (s *testBalancerSuite) addRegionPeer(c *C, clusterInfo *clusterInfo, storeID uint64, region *regionInfo) {
Expand All @@ -118,7 +118,7 @@ func (s *testBalancerSuite) addRegionPeer(c *C, clusterInfo *clusterInfo, storeI

addRegionPeer(c, region.Region, peer)

clusterInfo.setRegion(region)
clusterInfo.putRegion(region)
}

func (s *testBalancerSuite) TestCapacityBalancer(c *C) {
Expand Down Expand Up @@ -252,7 +252,7 @@ func (s *testBalancerSuite) TestCapacityBalancer(c *C) {
Id: 0,
MaxPeerCount: 1,
}
clusterInfo.setMeta(meta)
clusterInfo.putMeta(meta)

testCfg.MinCapacityUsedRatio = 0.3
testCfg.MaxCapacityUsedRatio = 0.9
Expand All @@ -264,7 +264,7 @@ func (s *testBalancerSuite) TestCapacityBalancer(c *C) {
// Set region peers to one peer.
peers := region.GetPeers()
region.Peers = []*metapb.Peer{leader}
clusterInfo.setRegion(region)
clusterInfo.putRegion(region)

cb = newCapacityBalancer(testCfg)
_, bop, err = cb.Balance(clusterInfo)
Expand All @@ -291,10 +291,10 @@ func (s *testBalancerSuite) TestCapacityBalancer(c *C) {
c.Assert(bop, IsNil)

// Reset cluster config and region peers.
clusterInfo.setMeta(oldMeta)
clusterInfo.putMeta(oldMeta)

region.Peers = peers
clusterInfo.setRegion(region)
clusterInfo.putRegion(region)
}

// TODO: Refactor these tests, they are quite ugly now.
Expand Down Expand Up @@ -468,7 +468,7 @@ func (s *testBalancerSuite) TestReplicaBalancerWithDownPeers(c *C) {

// Now we have enough active replicas, we can remove the down peer in store 4.
addRegionPeer(c, region.Region, op.ChangePeer.GetPeer())
clusterInfo.setRegion(region)
clusterInfo.putRegion(region)

rb = newReplicaBalancer(region, s.cfg)
_, bop, err = rb.Balance(clusterInfo)
Expand Down
63 changes: 45 additions & 18 deletions server/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,7 @@ type clusterInfo struct {
sync.RWMutex

id IDAllocator
kv *kv
meta *metapb.Cluster
stores *storesInfo
regions *regionsInfo
Expand All @@ -242,8 +243,9 @@ func newClusterInfo(id IDAllocator) *clusterInfo {
}
}

func loadClusterInfo(id IDAllocator, kv *kv) (*clusterInfo, error) {
func newClusterInfoWithKV(id IDAllocator, kv *kv) (*clusterInfo, error) {
c := newClusterInfo(id)
c.kv = kv

c.meta = &metapb.Cluster{}
ok, err := kv.loadMeta(c.meta)
Expand Down Expand Up @@ -291,10 +293,20 @@ func (c *clusterInfo) getMeta() *metapb.Cluster {
return proto.Clone(c.meta).(*metapb.Cluster)
}

func (c *clusterInfo) setMeta(meta *metapb.Cluster) {
func (c *clusterInfo) putMeta(meta *metapb.Cluster) error {
c.Lock()
defer c.Unlock()
return c.innerPutMeta(proto.Clone(meta).(*metapb.Cluster))
}

func (c *clusterInfo) innerPutMeta(meta *metapb.Cluster) error {
if c.kv != nil {
if err := c.kv.saveMeta(meta); err != nil {
return errors.Trace(err)
}
}
c.meta = meta
return nil
}

func (c *clusterInfo) getStore(storeID uint64) *storeInfo {
Expand All @@ -303,10 +315,20 @@ func (c *clusterInfo) getStore(storeID uint64) *storeInfo {
return c.stores.getStore(storeID)
}

func (c *clusterInfo) setStore(store *storeInfo) {
func (c *clusterInfo) putStore(store *storeInfo) error {
c.Lock()
defer c.Unlock()
c.stores.setStore(store.clone())
return c.innerPutStore(store.clone())
}

func (c *clusterInfo) innerPutStore(store *storeInfo) error {
if c.kv != nil {
if err := c.kv.saveStore(store.Store); err != nil {
return errors.Trace(err)
}
}
c.stores.setStore(store)
return nil
}

func (c *clusterInfo) getStores() []*storeInfo {
Expand Down Expand Up @@ -339,10 +361,20 @@ func (c *clusterInfo) searchRegion(regionKey []byte) *regionInfo {
return c.regions.searchRegion(regionKey)
}

func (c *clusterInfo) setRegion(region *regionInfo) {
func (c *clusterInfo) putRegion(region *regionInfo) error {
c.Lock()
defer c.Unlock()
c.regions.setRegion(region.clone())
return c.innerPutRegion(region.clone())
}

func (c *clusterInfo) innerPutRegion(region *regionInfo) error {
if c.kv != nil {
if err := c.kv.saveRegion(region.Region); err != nil {
return errors.Trace(err)
}
}
c.regions.setRegion(region)
return nil
}

func (c *clusterInfo) getRegions() []*regionInfo {
Expand Down Expand Up @@ -388,7 +420,6 @@ func (c *clusterInfo) randFollowerRegion(storeID uint64) *regionInfo {
}

// handleStoreHeartbeat updates the store status.
// It returns an error if the store is not found.
func (c *clusterInfo) handleStoreHeartbeat(stats *pdpb.StoreStats) error {
c.Lock()
defer c.Unlock()
Expand All @@ -409,9 +440,7 @@ func (c *clusterInfo) handleStoreHeartbeat(stats *pdpb.StoreStats) error {
}

// handleRegionHeartbeat updates the region information.
// It returns true if the region meta is updated (or added).
// It returns an error if any error occurs.
func (c *clusterInfo) handleRegionHeartbeat(region *regionInfo) (bool, error) {
func (c *clusterInfo) handleRegionHeartbeat(region *regionInfo) error {
c.Lock()
defer c.Unlock()

Expand All @@ -420,25 +449,23 @@ func (c *clusterInfo) handleRegionHeartbeat(region *regionInfo) (bool, error) {

// Region does not exist, add it.
if origin == nil {
c.regions.setRegion(region)
return true, nil
return c.innerPutRegion(region)
}

r := region.GetRegionEpoch()
o := origin.GetRegionEpoch()

// Region meta is stale, return an error.
if r.GetVersion() < o.GetVersion() || r.GetConfVer() < o.GetConfVer() {
return false, errors.Trace(errRegionIsStale(region.Region, origin.Region))
return errors.Trace(errRegionIsStale(region.Region, origin.Region))
}

// Region meta is updated, update region and return true.
// Region meta is updated, update kv and cache.
if r.GetVersion() > o.GetVersion() || r.GetConfVer() > o.GetConfVer() {
c.regions.setRegion(region)
return true, nil
return c.innerPutRegion(region)
}

// Region meta is the same, update region and return false.
// Region meta is the same, update cache only.
c.regions.setRegion(region)
return false, nil
return nil
}
Loading

0 comments on commit 8d9b537

Please sign in to comment.