Skip to content

Commit

Permalink
address comment
Browse files Browse the repository at this point in the history
Signed-off-by: Cabinfever_B <cabinfeveroier@gmail.com>
  • Loading branch information
CabinfeverB committed Dec 14, 2023
1 parent 261b2d0 commit 0fd9d66
Show file tree
Hide file tree
Showing 3 changed files with 69 additions and 12 deletions.
65 changes: 54 additions & 11 deletions internal/locate/region_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -552,8 +552,8 @@ func (c *RegionCache) clear() {
}

// thread unsafe, should use with lock
func (c *RegionCache) insertRegionToCache(cachedRegion *Region, invalidateOldRegion bool) {
c.mu.insertRegionToCache(cachedRegion, invalidateOldRegion)
func (c *RegionCache) insertRegionToCache(cachedRegion *Region, invalidateOldRegion bool) bool {
return c.mu.insertRegionToCache(cachedRegion, invalidateOldRegion)
}

// Close releases region cache's resource.
Expand Down Expand Up @@ -1107,8 +1107,20 @@ func (c *RegionCache) findRegionByKey(bo *retry.Backoffer, key []byte, isEndKey
logutil.Eventf(bo.GetCtx(), "load region %d from pd, due to cache-miss", lr.GetID())
r = lr
c.mu.Lock()
c.insertRegionToCache(r, true)
stale := c.insertRegionToCache(r, true)
c.mu.Unlock()
// just retry once, it won't bring much overhead.
if stale {
lr, err = c.loadRegion(bo, key, isEndKey)
if err != nil {
// no region data, return error if failure.
return nil, err
}
r = lr
c.mu.Lock()
c.insertRegionToCache(r, true)
c.mu.Unlock()
}
} else if r.checkNeedReloadAndMarkUpdated() {
// load region when it be marked as need reload.
lr, err := c.loadRegion(bo, key, isEndKey)
Expand Down Expand Up @@ -1485,7 +1497,32 @@ func (mu *regionIndexMu) removeVersionFromCache(oldVer RegionVerID, regionID uin
// It should be protected by c.mu.l.Lock().
// if `invalidateOldRegion` is false, the old region cache should be still valid,
// and it may still be used by some kv requests.
func (mu *regionIndexMu) insertRegionToCache(cachedRegion *Region, invalidateOldRegion bool) {
// Moreover, it will return whether the region is fresh.
func (mu *regionIndexMu) insertRegionToCache(cachedRegion *Region, invalidateOldRegion bool) bool {
newVer := cachedRegion.VerID()
latest, ok := mu.latestVersions[cachedRegion.VerID().id]
// There are two or more situations in which the region we got is stale.
// The first case is that the process of getting a region is concurrent.
// The stale region may be returned later due to network reasons.
// The second case is that the region may be obtained from the PD follower,
// and there is the synchronization time between the pd follower and the leader.
// So we should check the epoch.
if ok && (latest.GetVer() > newVer.GetVer() || latest.GetConfVer() > newVer.GetConfVer()) {
logutil.BgLogger().Debug("get stale region",
zap.Uint64("region", newVer.GetID()), zap.Uint64("ver", newVer.GetVer()), zap.Uint64("conf", newVer.GetConfVer()),
zap.Uint64("lastest-ver", latest.GetVer()), zap.Uint64("lastest-conf", latest.GetConfVer()))
return false
}
// Also check the intersecting regions.
intersectedRegions := mu.sorted.removeIntersecting(cachedRegion)
for _, region := range intersectedRegions {
if region.cachedRegion.meta.GetRegionEpoch().GetVersion() > newVer.GetVer() {
logutil.BgLogger().Debug("get stale region",
zap.Uint64("region", newVer.GetID()), zap.Uint64("ver", newVer.GetVer()), zap.Uint64("conf", newVer.GetConfVer()),
zap.Uint64("intersecting-ver", region.cachedRegion.meta.GetRegionEpoch().GetVersion()))
return false
}
}
oldRegion := mu.sorted.ReplaceOrInsert(cachedRegion)
if oldRegion != nil {
store := cachedRegion.getStore()
Expand Down Expand Up @@ -1513,21 +1550,27 @@ func (mu *regionIndexMu) insertRegionToCache(cachedRegion *Region, invalidateOld
if store.buckets == nil || (oldRegionStore.buckets != nil && store.buckets.GetVersion() < oldRegionStore.buckets.GetVersion()) {
store.buckets = oldRegionStore.buckets
}
mu.removeVersionFromCache(oldRegion.VerID(), cachedRegion.VerID().id)
// Only delete when IDs are different, because we will update right away.
if cachedRegion.VerID().id != oldRegion.VerID().id {
mu.removeVersionFromCache(oldRegion.VerID(), cachedRegion.VerID().id)
mu.removeVersionFromCache(oldRegion.VerID(), cachedRegion.VerID().id)
mu.removeVersionFromCache(oldRegion.VerID(), cachedRegion.VerID().id)
}
}
// update related vars.
mu.regions[cachedRegion.VerID()] = cachedRegion
newVer := cachedRegion.VerID()
latest, ok := mu.latestVersions[cachedRegion.VerID().id]
latest, ok = mu.latestVersions[cachedRegion.VerID().id]
if !ok || latest.GetVer() < newVer.GetVer() || latest.GetConfVer() < newVer.GetConfVer() {
mu.latestVersions[cachedRegion.VerID().id] = newVer
}
// The intersecting regions in the cache are probably stale, clear them.
deleted := mu.sorted.removeIntersecting(cachedRegion)
for _, region := range deleted {
for _, region := range intersectedRegions {
mu.removeVersionFromCache(region.cachedRegion.VerID(), region.cachedRegion.GetID())
}
return true
}

} // searchCachedRegion finds a region from cache by key. Like `getCachedRegion`,
// searchCachedRegion finds a region from cache by key. Like `getCachedRegion`,
// it should be called with c.mu.RLock(), and the returned Region should not be
// used after c.mu is RUnlock().
// If the given key is the end key of the region that you want, you may set the second argument to true. This is useful
Expand Down Expand Up @@ -1555,7 +1598,7 @@ func (c *RegionCache) getRegionByIDFromCache(regionID uint64) *Region {
}
latestRegion, ok := c.mu.regions[ver]
if !ok {
// should not happen
// Should not happen. If happned, maybe
logutil.BgLogger().Warn("region version not found",
zap.Uint64("regionID", regionID), zap.Stringer("version", &ver))
return nil
Expand Down
2 changes: 2 additions & 0 deletions internal/locate/region_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1567,6 +1567,8 @@ func (s *testRegionCacheSuite) TestBuckets() {

// update buckets if it's nil.
cachedRegion.getStore().buckets = nil
// we should replace the version of `cacheRegion` because of stale.
s.cluster.PutRegion(r.GetId(), newMeta.RegionEpoch.ConfVer, newMeta.RegionEpoch.Version, []uint64{s.store1, s.store2}, []uint64{s.peer1, s.peer2}, s.peer1)
s.cluster.SplitRegionBuckets(cachedRegion.GetID(), defaultBuckets.Keys, defaultBuckets.Version)
s.cache.UpdateBucketsIfNeeded(cachedRegion.VerID(), defaultBuckets.GetVersion())
waitUpdateBuckets(defaultBuckets, []byte("a"))
Expand Down
14 changes: 13 additions & 1 deletion internal/mockstore/mocktikv/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -413,6 +413,14 @@ func (c *Cluster) Bootstrap(regionID uint64, storeIDs, peerIDs []uint64, leaderP
c.regions[regionID] = newRegion(regionID, storeIDs, peerIDs, leaderPeerID)
}

// PutRegion adds or replaces a region.
func (c *Cluster) PutRegion(regionID, confVer, ver uint64, storeIDs, peerIDs []uint64, leaderPeerID uint64) {
c.Lock()
defer c.Unlock()

c.regions[regionID] = newRegion(regionID, storeIDs, peerIDs, leaderPeerID, confVer, ver)
}

// AddPeer adds a new Peer for the Region on the Store.
func (c *Cluster) AddPeer(regionID, storeID, peerID uint64) {
c.Lock()
Expand Down Expand Up @@ -634,7 +642,7 @@ func newPeerMeta(peerID, storeID uint64) *metapb.Peer {
}
}

func newRegion(regionID uint64, storeIDs, peerIDs []uint64, leaderPeerID uint64) *Region {
func newRegion(regionID uint64, storeIDs, peerIDs []uint64, leaderPeerID uint64, epoch ...uint64) *Region {
if len(storeIDs) != len(peerIDs) {
panic("len(storeIDs) != len(peerIds)")
}
Expand All @@ -647,6 +655,10 @@ func newRegion(regionID uint64, storeIDs, peerIDs []uint64, leaderPeerID uint64)
Peers: peers,
RegionEpoch: &metapb.RegionEpoch{},
}
if len(epoch) == 2 {
meta.RegionEpoch.ConfVer = epoch[0]
meta.RegionEpoch.Version = epoch[1]
}
return &Region{
Meta: meta,
leader: leaderPeerID,
Expand Down

0 comments on commit 0fd9d66

Please sign in to comment.