Skip to content

Commit

Permalink
Add a background region cache GC goroutine
Browse files Browse the repository at this point in the history
Signed-off-by: Yilin Chen <sticnarf@gmail.com>
  • Loading branch information
sticnarf committed Jan 11, 2023
1 parent 1b1a805 commit 9ec1fe4
Show file tree
Hide file tree
Showing 3 changed files with 98 additions and 1 deletion.
56 changes: 56 additions & 0 deletions internal/locate/region_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,11 @@ func (r *Region) compareAndSwapStore(oldStore, newStore *regionStore) bool {
return atomic.CompareAndSwapPointer(&r.store, unsafe.Pointer(oldStore), unsafe.Pointer(newStore))
}

func (r *Region) isCacheTTLExpired(ts int64) bool {
lastAccess := atomic.LoadInt64(&r.lastAccess)
return ts-lastAccess > regionCacheTTLSec
}

func (r *Region) checkRegionCacheTTL(ts int64) bool {
// Only consider use percentage on this failpoint, for example, "2%return"
if _, err := util.EvalFailpoint("invalidateRegionCache"); err == nil {
Expand Down Expand Up @@ -423,6 +428,7 @@ func NewRegionCache(pdClient pd.Client) *RegionCache {
c.ctx, c.cancelFunc = context.WithCancel(context.Background())
interval := config.GetGlobalConfig().StoresRefreshInterval
go c.asyncCheckAndResolveLoop(time.Duration(interval) * time.Second)
go c.cacheGC()
c.enableForwarding = config.GetGlobalConfig().EnableForwarding
return c
}
Expand Down Expand Up @@ -1906,6 +1912,56 @@ func (c *RegionCache) UpdateBucketsIfNeeded(regionID RegionVerID, latestBucketsV
}
}

const cleanCacheInterval = 100 * time.Millisecond
const cleanRegionNumPerRound = 10

// This function is expected to run in a background goroutine.
// It keeps iterating over the whole region cache, searching for stale region
// info. It runs at cleanCacheInterval and checks only cleanRegionNumPerRound
// regions. In this way, the impact of this background goroutine should be
// negligible.
func (c *RegionCache) cacheGC() {
ticker := time.NewTicker(cleanCacheInterval)
defer ticker.Stop()

iterItem := newBtreeSearchItem([]byte(""))
expired := make([]*btreeItem, cleanRegionNumPerRound)
for {
select {
case <-c.ctx.Done():
return
case <-ticker.C:
count := 0
expired = expired[:0]

// Only RLock when checking TTL to avoid blocking other readers
c.mu.RLock()
ts := time.Now().Unix()
c.mu.sorted.b.AscendGreaterOrEqual(iterItem, func(item *btreeItem) bool {
if count > cleanRegionNumPerRound {
iterItem = item
return false
}
count++
if item.cachedRegion.isCacheTTLExpired(ts) {
expired = append(expired, item)
}
return true
})
c.mu.RUnlock()

if len(expired) > 0 {
c.mu.Lock()
for _, item := range expired {
c.mu.sorted.b.Delete(item)
c.removeVersionFromCache(item.cachedRegion.VerID(), item.cachedRegion.GetID())
}
c.mu.Unlock()
}
}
}
}

// btreeItem is BTree's Item that uses []byte to compare.
type btreeItem struct {
key []byte
Expand Down
37 changes: 37 additions & 0 deletions internal/locate/region_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
"fmt"
"math/rand"
"reflect"
"sync/atomic"
"testing"
"time"

Expand Down Expand Up @@ -1645,3 +1646,39 @@ func (s *testRegionCacheSuite) TestShouldNotRetryFlashback() {
s.Error(err)
s.False(shouldRetry)
}

func (s *testRegionCacheSuite) TestBackgroundCacheGC() {
// Prepare 50 regions
regionCnt := 50
regions := s.cluster.AllocIDs(regionCnt)
regions = append([]uint64{s.region1}, regions...)
peers := [][]uint64{{s.peer1, s.peer2}}
for i := 0; i < regionCnt; i++ {
peers = append(peers, s.cluster.AllocIDs(2))
}
for i := 0; i < regionCnt; i++ {
s.cluster.Split(regions[i], regions[i+1], []byte(fmt.Sprintf(regionSplitKeyFormat, i)), peers[i+1], peers[i+1][0])
}
loadRegionsToCache(s.cache, regionCnt)
s.checkCache(regionCnt)

// Make parts of the regions stale
remaining := 0
s.cache.mu.Lock()
now := time.Now().Unix()
for verID, r := range s.cache.mu.regions {
if verID.id%3 == 0 {
atomic.StoreInt64(&r.lastAccess, now-regionCacheTTLSec-10)
} else {
remaining++
}
}
s.cache.mu.Unlock()

s.Eventually(func() bool {
s.cache.mu.RLock()
defer s.cache.mu.RUnlock()
return len(s.cache.mu.regions) == remaining
}, 2*time.Second, 200*time.Millisecond)
s.checkCache(remaining)
}
6 changes: 5 additions & 1 deletion internal/locate/region_request3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,9 @@ func (s *testRegionRequestToThreeStoresSuite) TestLearnerReplicaSelector() {

cache := NewRegionCache(s.cache.pdClient)
defer cache.Close()
cache.mu.Lock()
cache.insertRegionToCache(region)
cache.mu.Unlock()

// Test accessFollower state with kv.ReplicaReadLearner request type.
region.lastAccess = time.Now().Unix()
Expand Down Expand Up @@ -369,7 +371,9 @@ func (s *testRegionRequestToThreeStoresSuite) TestReplicaSelector() {

cache := NewRegionCache(s.cache.pdClient)
defer cache.Close()
cache.mu.Lock()
cache.insertRegionToCache(region)
cache.mu.Unlock()

// Verify creating the replicaSelector.
replicaSelector, err := newReplicaSelector(cache, regionLoc.Region, req)
Expand Down Expand Up @@ -590,7 +594,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestReplicaSelector() {
s.Nil(err)

// Test accessFollower state filtering epoch-stale stores.
region.lastAccess = time.Now().Unix()
atomic.StoreInt64(&region.lastAccess, time.Now().Unix())
refreshEpochs(regionStore)
// Mark all followers as stale.
tiKVNum := regionStore.accessStoreNum(tiKVOnly)
Expand Down

0 comments on commit 9ec1fe4

Please sign in to comment.