Skip to content

Commit

Permalink
receive/expandedpostingscache: fix race (#7937)
Browse files Browse the repository at this point in the history
Porting cortexproject/cortex#6369 to our code
base. Add test that fails without the fix.

Signed-off-by: Giedrius Statkevičius <giedrius.statkevicius@vinted.com>
  • Loading branch information
GiedriusS authored Nov 25, 2024
1 parent b144eba commit a55844d
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 10 deletions.
22 changes: 13 additions & 9 deletions pkg/receive/expandedpostingscache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ type BlocksPostingsForMatchersCache struct {
postingsForMatchersFunc func(ctx context.Context, ix tsdb.IndexReader, ms ...*labels.Matcher) (index.Postings, error)
timeNow func() time.Time

metrics *ExpandedPostingsCacheMetrics
metrics ExpandedPostingsCacheMetrics
}

var (
Expand All @@ -66,8 +66,8 @@ type ExpandedPostingsCacheMetrics struct {
NonCacheableQueries *prometheus.CounterVec
}

func NewPostingCacheMetrics(r prometheus.Registerer) *ExpandedPostingsCacheMetrics {
return &ExpandedPostingsCacheMetrics{
func NewPostingCacheMetrics(r prometheus.Registerer) ExpandedPostingsCacheMetrics {
return ExpandedPostingsCacheMetrics{
CacheRequests: promauto.With(r).NewCounterVec(prometheus.CounterOpts{
Name: "expanded_postings_cache_requests_total",
Help: "Total number of requests to the cache.",
Expand All @@ -87,11 +87,15 @@ func NewPostingCacheMetrics(r prometheus.Registerer) *ExpandedPostingsCacheMetri
}
}

func NewBlocksPostingsForMatchersCache(metrics *ExpandedPostingsCacheMetrics, headExpandedPostingsCacheSize uint64, blockExpandedPostingsCacheSize uint64) ExpandedPostingsCache {
func NewBlocksPostingsForMatchersCache(metrics ExpandedPostingsCacheMetrics, headExpandedPostingsCacheSize uint64, blockExpandedPostingsCacheSize uint64, seedSize int64) *BlocksPostingsForMatchersCache {
if seedSize <= 0 {
seedSize = seedArraySize
}

return &BlocksPostingsForMatchersCache{
headCache: newFifoCache[[]storage.SeriesRef]("head", metrics, time.Now, headExpandedPostingsCacheSize),
blocksCache: newFifoCache[[]storage.SeriesRef]("block", metrics, time.Now, blockExpandedPostingsCacheSize),
headSeedByMetricName: make([]int, seedArraySize),
headSeedByMetricName: make([]int, seedSize),
strippedLock: make([]sync.RWMutex, numOfSeedsStripes),
postingsForMatchersFunc: tsdb.PostingsForMatchers,
timeNow: time.Now,
Expand Down Expand Up @@ -129,7 +133,7 @@ func (c *BlocksPostingsForMatchersCache) ExpireSeries(metric labels.Labels) {

h := MemHashString(metricName)
i := h % uint64(len(c.headSeedByMetricName))
l := h % uint64(len(c.strippedLock))
l := i % uint64(len(c.strippedLock))
c.strippedLock[l].Lock()
defer c.strippedLock[l].Unlock()
c.headSeedByMetricName[i]++
Expand Down Expand Up @@ -200,7 +204,7 @@ func (c *BlocksPostingsForMatchersCache) result(ce *cacheEntryPromise[[]storage.
func (c *BlocksPostingsForMatchersCache) getSeedForMetricName(metricName string) string {
h := MemHashString(metricName)
i := h % uint64(len(c.headSeedByMetricName))
l := h % uint64(len(c.strippedLock))
l := i % uint64(len(c.strippedLock))
c.strippedLock[l].RLock()
defer c.strippedLock[l].RUnlock()
return strconv.Itoa(c.headSeedByMetricName[i])
Expand Down Expand Up @@ -276,13 +280,13 @@ type fifoCache[V any] struct {
cachedBytes int64
}

func newFifoCache[V any](name string, metrics *ExpandedPostingsCacheMetrics, timeNow func() time.Time, maxBytes uint64) *fifoCache[V] {
func newFifoCache[V any](name string, metrics ExpandedPostingsCacheMetrics, timeNow func() time.Time, maxBytes uint64) *fifoCache[V] {
return &fifoCache[V]{
cachedValues: new(sync.Map),
cached: list.New(),
timeNow: timeNow,
name: name,
metrics: *metrics,
metrics: metrics,
ttl: 10 * time.Minute,
maxBytes: int64(maxBytes),
}
Expand Down
42 changes: 42 additions & 0 deletions pkg/receive/expandedpostingscache/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,11 @@ import (
"time"

"go.uber.org/atomic"
"golang.org/x/exp/rand"

"github.com/prometheus/client_golang/prometheus"
promtest "github.com/prometheus/client_golang/prometheus/testutil"
"github.com/prometheus/prometheus/model/labels"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -166,3 +168,43 @@ func repeatStringIfNeeded(seed string, length int) string {

return strings.Repeat(seed, 1+length/len(seed))[:max(length, len(seed))]
}

func TestLockRaceExpireSeries(t *testing.T) {
for j := 0; j < 10; j++ {
wg := &sync.WaitGroup{}

c := NewBlocksPostingsForMatchersCache(ExpandedPostingsCacheMetrics{}, 1<<7, 1<<7, 3)
for i := 0; i < 1000; i++ {
wg.Add(2)

go func() {
defer wg.Done()
for i := 0; i < 10; i++ {
c.ExpireSeries(
labels.FromMap(map[string]string{"__name__": randSeq(10)}),
)
}
}()

go func() {
defer wg.Done()

for i := 0; i < 10; i++ {
c.getSeedForMetricName(randSeq(10))
}
}()
}
wg.Wait()
}
}

var letters = []rune("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ")

func randSeq(n int) string {
b := make([]rune, n)
rand.Seed(uint64(time.Now().UnixNano()))
for i := range b {
b[i] = letters[rand.Intn(len(letters))]
}
return string(b)
}
2 changes: 1 addition & 1 deletion pkg/receive/multitsdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -709,7 +709,7 @@ func (t *MultiTSDB) startTSDB(logger log.Logger, tenantID string, tenant *tenant
if t.headExpandedPostingsCacheSize > 0 || t.blockExpandedPostingsCacheSize > 0 {
var expandedPostingsCacheMetrics = expandedpostingscache.NewPostingCacheMetrics(extprom.WrapRegistererWithPrefix("thanos_", reg))

expandedPostingsCache = expandedpostingscache.NewBlocksPostingsForMatchersCache(expandedPostingsCacheMetrics, t.headExpandedPostingsCacheSize, t.blockExpandedPostingsCacheSize)
expandedPostingsCache = expandedpostingscache.NewBlocksPostingsForMatchersCache(expandedPostingsCacheMetrics, t.headExpandedPostingsCacheSize, t.blockExpandedPostingsCacheSize, 0)
}

opts := *t.tsdbOpts
Expand Down

0 comments on commit a55844d

Please sign in to comment.