diff --git a/internal/locate/region_cache.go b/internal/locate/region_cache.go index a0b422fe4..f13cae616 100644 --- a/internal/locate/region_cache.go +++ b/internal/locate/region_cache.go @@ -316,6 +316,11 @@ func (r *Region) compareAndSwapStore(oldStore, newStore *regionStore) bool { return atomic.CompareAndSwapPointer(&r.store, unsafe.Pointer(oldStore), unsafe.Pointer(newStore)) } +func (r *Region) isCacheTTLExpired(ts int64) bool { + lastAccess := atomic.LoadInt64(&r.lastAccess) + return ts-lastAccess > regionCacheTTLSec +} + func (r *Region) checkRegionCacheTTL(ts int64) bool { // Only consider use percentage on this failpoint, for example, "2%return" if _, err := util.EvalFailpoint("invalidateRegionCache"); err == nil { @@ -423,6 +428,7 @@ func NewRegionCache(pdClient pd.Client) *RegionCache { c.ctx, c.cancelFunc = context.WithCancel(context.Background()) interval := config.GetGlobalConfig().StoresRefreshInterval go c.asyncCheckAndResolveLoop(time.Duration(interval) * time.Second) + go c.cacheGC() c.enableForwarding = config.GetGlobalConfig().EnableForwarding return c } @@ -1906,6 +1912,56 @@ func (c *RegionCache) UpdateBucketsIfNeeded(regionID RegionVerID, latestBucketsV } } +const cleanCacheInterval = 100 * time.Millisecond +const cleanRegionNumPerRound = 10 + +// This function is expected to run in a background goroutine. +// It keeps iterating over the whole region cache, searching for stale region +// info. It runs at cleanCacheInterval and checks only cleanRegionNumPerRound +// regions. In this way, the impact of this background goroutine should be +// negligible. +func (c *RegionCache) cacheGC() { + ticker := time.NewTicker(cleanCacheInterval) + defer ticker.Stop() + + iterItem := newBtreeSearchItem([]byte("")) + expired := make([]*btreeItem, cleanRegionNumPerRound) + for { + select { + case <-c.ctx.Done(): + return + case <-ticker.C: + count := 0 + expired = expired[:0] + + // Only RLock when checking TTL to avoid blocking other readers + c.mu.RLock() + ts := time.Now().Unix() + c.mu.sorted.b.AscendGreaterOrEqual(iterItem, func(item *btreeItem) bool { + if count > cleanRegionNumPerRound { + iterItem = item + return false + } + count++ + if item.cachedRegion.isCacheTTLExpired(ts) { + expired = append(expired, item) + } + return true + }) + c.mu.RUnlock() + + if len(expired) > 0 { + c.mu.Lock() + for _, item := range expired { + c.mu.sorted.b.Delete(item) + c.removeVersionFromCache(item.cachedRegion.VerID(), item.cachedRegion.GetID()) + } + c.mu.Unlock() + } + } + } +} + // btreeItem is BTree's Item that uses []byte to compare. type btreeItem struct { key []byte diff --git a/internal/locate/region_cache_test.go b/internal/locate/region_cache_test.go index 5f659a5f5..27fbdff05 100644 --- a/internal/locate/region_cache_test.go +++ b/internal/locate/region_cache_test.go @@ -40,6 +40,7 @@ import ( "fmt" "math/rand" "reflect" + "sync/atomic" "testing" "time" @@ -1645,3 +1646,39 @@ func (s *testRegionCacheSuite) TestShouldNotRetryFlashback() { s.Error(err) s.False(shouldRetry) } + +func (s *testRegionCacheSuite) TestBackgroundCacheGC() { + // Prepare 50 regions + regionCnt := 50 + regions := s.cluster.AllocIDs(regionCnt) + regions = append([]uint64{s.region1}, regions...) + peers := [][]uint64{{s.peer1, s.peer2}} + for i := 0; i < regionCnt; i++ { + peers = append(peers, s.cluster.AllocIDs(2)) + } + for i := 0; i < regionCnt; i++ { + s.cluster.Split(regions[i], regions[i+1], []byte(fmt.Sprintf(regionSplitKeyFormat, i)), peers[i+1], peers[i+1][0]) + } + loadRegionsToCache(s.cache, regionCnt) + s.checkCache(regionCnt) + + // Make parts of the regions stale + remaining := 0 + s.cache.mu.Lock() + now := time.Now().Unix() + for verID, r := range s.cache.mu.regions { + if verID.id%3 == 0 { + atomic.StoreInt64(&r.lastAccess, now-regionCacheTTLSec-10) + } else { + remaining++ + } + } + s.cache.mu.Unlock() + + s.Eventually(func() bool { + s.cache.mu.RLock() + defer s.cache.mu.RUnlock() + return len(s.cache.mu.regions) == remaining + }, 2*time.Second, 200*time.Millisecond) + s.checkCache(remaining) +} diff --git a/internal/locate/region_request3_test.go b/internal/locate/region_request3_test.go index 9eeba8ef0..79e401415 100644 --- a/internal/locate/region_request3_test.go +++ b/internal/locate/region_request3_test.go @@ -320,7 +320,9 @@ func (s *testRegionRequestToThreeStoresSuite) TestLearnerReplicaSelector() { cache := NewRegionCache(s.cache.pdClient) defer cache.Close() + cache.mu.Lock() cache.insertRegionToCache(region) + cache.mu.Unlock() // Test accessFollower state with kv.ReplicaReadLearner request type. region.lastAccess = time.Now().Unix() @@ -369,7 +371,9 @@ func (s *testRegionRequestToThreeStoresSuite) TestReplicaSelector() { cache := NewRegionCache(s.cache.pdClient) defer cache.Close() + cache.mu.Lock() cache.insertRegionToCache(region) + cache.mu.Unlock() // Verify creating the replicaSelector. replicaSelector, err := newReplicaSelector(cache, regionLoc.Region, req) @@ -590,7 +594,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestReplicaSelector() { s.Nil(err) // Test accessFollower state filtering epoch-stale stores. - region.lastAccess = time.Now().Unix() + atomic.StoreInt64(®ion.lastAccess, time.Now().Unix()) refreshEpochs(regionStore) // Mark all followers as stale. tiKVNum := regionStore.accessStoreNum(tiKVOnly)