From b670f6c3eb65996991ca7029151b0fd306dfc6fb Mon Sep 17 00:00:00 2001 From: Yilin Chen Date: Mon, 16 Jan 2023 17:49:18 +0800 Subject: [PATCH] Add a background region cache GC goroutine (#664) (#668) Signed-off-by: Yilin Chen --- internal/locate/region_cache.go | 57 +++++++++++++++++++++++++ internal/locate/region_cache_test.go | 37 ++++++++++++++++ internal/locate/region_request3_test.go | 4 +- 3 files changed, 97 insertions(+), 1 deletion(-) diff --git a/internal/locate/region_cache.go b/internal/locate/region_cache.go index 88ebdf5b2..457377a7e 100644 --- a/internal/locate/region_cache.go +++ b/internal/locate/region_cache.go @@ -278,6 +278,11 @@ func (r *Region) compareAndSwapStore(oldStore, newStore *regionStore) bool { return atomic.CompareAndSwapPointer(&r.store, unsafe.Pointer(oldStore), unsafe.Pointer(newStore)) } +func (r *Region) isCacheTTLExpired(ts int64) bool { + lastAccess := atomic.LoadInt64(&r.lastAccess) + return ts-lastAccess > regionCacheTTLSec +} + func (r *Region) checkRegionCacheTTL(ts int64) bool { // Only consider use percentage on this failpoint, for example, "2%return" if _, err := util.EvalFailpoint("invalidateRegionCache"); err == nil { @@ -371,6 +376,7 @@ func NewRegionCache(pdClient pd.Client) *RegionCache { c.ctx, c.cancelFunc = context.WithCancel(context.Background()) interval := config.GetGlobalConfig().StoresRefreshInterval go c.asyncCheckAndResolveLoop(time.Duration(interval) * time.Second) + go c.cacheGC() c.enableForwarding = config.GetGlobalConfig().EnableForwarding return c } @@ -1627,6 +1633,57 @@ func (c *RegionCache) GetTiFlashStores() []*Store { return stores } +const cleanCacheInterval = time.Second +const cleanRegionNumPerRound = 50 + +// This function is expected to run in a background goroutine. +// It keeps iterating over the whole region cache, searching for stale region +// info. It runs at cleanCacheInterval and checks only cleanRegionNumPerRound +// regions. In this way, the impact of this background goroutine should be +// negligible. +func (c *RegionCache) cacheGC() { + ticker := time.NewTicker(cleanCacheInterval) + defer ticker.Stop() + + iterItem := newBtreeSearchItem([]byte("")) + expired := make([]*btreeItem, cleanRegionNumPerRound) + for { + select { + case <-c.ctx.Done(): + return + case <-ticker.C: + count := 0 + expired = expired[:0] + + // Only RLock when checking TTL to avoid blocking other readers + c.mu.RLock() + ts := time.Now().Unix() + c.mu.sorted.AscendGreaterOrEqual(iterItem, func(item_ btree.Item) bool { + item := item_.(*btreeItem) + if count > cleanRegionNumPerRound { + iterItem = item + return false + } + count++ + if item.cachedRegion.isCacheTTLExpired(ts) { + expired = append(expired, item) + } + return true + }) + c.mu.RUnlock() + + if len(expired) > 0 { + c.mu.Lock() + for _, item := range expired { + c.mu.sorted.Delete(item) + c.removeVersionFromCache(item.cachedRegion.VerID(), item.cachedRegion.GetID()) + } + c.mu.Unlock() + } + } + } +} + // btreeItem is BTree's Item that uses []byte to compare. type btreeItem struct { key []byte diff --git a/internal/locate/region_cache_test.go b/internal/locate/region_cache_test.go index 83fe6d1a3..6d6f58ada 100644 --- a/internal/locate/region_cache_test.go +++ b/internal/locate/region_cache_test.go @@ -39,6 +39,7 @@ import ( "errors" "fmt" "math/rand" + "sync/atomic" "testing" "time" @@ -1441,3 +1442,39 @@ func (s *testRegionCacheSuite) TestNoBackoffWhenFailToDecodeRegion() { s.NotNil(err) s.Equal(0, s.bo.GetTotalBackoffTimes()) } + +func (s *testRegionCacheSuite) TestBackgroundCacheGC() { + // Prepare 50 regions + regionCnt := 50 + regions := s.cluster.AllocIDs(regionCnt) + regions = append([]uint64{s.region1}, regions...) + peers := [][]uint64{{s.peer1, s.peer2}} + for i := 0; i < regionCnt; i++ { + peers = append(peers, s.cluster.AllocIDs(2)) + } + for i := 0; i < regionCnt; i++ { + s.cluster.Split(regions[i], regions[i+1], []byte(fmt.Sprintf(regionSplitKeyFormat, i)), peers[i+1], peers[i+1][0]) + } + loadRegionsToCache(s.cache, regionCnt) + s.checkCache(regionCnt) + + // Make parts of the regions stale + remaining := 0 + s.cache.mu.Lock() + now := time.Now().Unix() + for verID, r := range s.cache.mu.regions { + if verID.id%3 == 0 { + atomic.StoreInt64(&r.lastAccess, now-regionCacheTTLSec-10) + } else { + remaining++ + } + } + s.cache.mu.Unlock() + + s.Eventually(func() bool { + s.cache.mu.RLock() + defer s.cache.mu.RUnlock() + return len(s.cache.mu.regions) == remaining + }, 2*time.Second, 200*time.Millisecond) + s.checkCache(remaining) +} diff --git a/internal/locate/region_request3_test.go b/internal/locate/region_request3_test.go index 0aaf9f58c..0492eaedd 100644 --- a/internal/locate/region_request3_test.go +++ b/internal/locate/region_request3_test.go @@ -304,7 +304,9 @@ func (s *testRegionRequestToThreeStoresSuite) TestReplicaSelector() { cache := NewRegionCache(s.cache.pdClient) defer cache.Close() + cache.mu.Lock() cache.insertRegionToCache(region) + cache.mu.Unlock() // Verify creating the replicaSelector. replicaSelector, err := newReplicaSelector(cache, regionLoc.Region, req) @@ -537,7 +539,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestReplicaSelector() { s.Nil(err) // Test accessFollower state filtering epoch-stale stores. - region.lastAccess = time.Now().Unix() + atomic.StoreInt64(®ion.lastAccess, time.Now().Unix()) refreshEpochs(regionStore) // Mark all followers as stale. tiKVNum := regionStore.accessStoreNum(tiKVOnly)