From cae2727c44dea21da22de1c88dc34d2e5d5a5ea5 Mon Sep 17 00:00:00 2001 From: Ben Ye Date: Mon, 25 Sep 2023 00:42:33 -0700 Subject: [PATCH 1/3] add latency histogram for fetching index cache Signed-off-by: Ben Ye --- pkg/store/cache/cache.go | 6 ++++++ pkg/store/cache/inmemory.go | 10 ++++++++++ pkg/store/cache/memcached.go | 34 +++++++++++++++++++++++++--------- 3 files changed, 41 insertions(+), 9 deletions(-) diff --git a/pkg/store/cache/cache.go b/pkg/store/cache/cache.go index 87cdb17d96..360cdd67e5 100644 --- a/pkg/store/cache/cache.go +++ b/pkg/store/cache/cache.go @@ -61,6 +61,7 @@ type commonMetrics struct { requestTotal *prometheus.CounterVec hitsTotal *prometheus.CounterVec dataSizeBytes *prometheus.HistogramVec + fetchLatency *prometheus.HistogramVec } func newCommonMetrics(reg prometheus.Registerer) *commonMetrics { @@ -80,6 +81,11 @@ func newCommonMetrics(reg prometheus.Registerer) *commonMetrics { 32, 256, 512, 1024, 32 * 1024, 256 * 1024, 512 * 1024, 1024 * 1024, 32 * 1024 * 1024, 64 * 1024 * 1024, 128 * 1024 * 1024, 256 * 1024 * 1024, 512 * 1024 * 1024, }, }, []string{"item_type"}), + fetchLatency: promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{ + Name: "thanos_store_index_cache_fetch_duration_seconds", + Help: "Histogram to track latency to fetch items from index cache", + Buckets: []float64{0.01, 0.1, 0.3, 0.6, 1, 3, 6, 10, 15, 20, 30, 45, 60, 90, 120}, + }, []string{"item_type"}), } } diff --git a/pkg/store/cache/inmemory.go b/pkg/store/cache/inmemory.go index 747199b414..dd479fd053 100644 --- a/pkg/store/cache/inmemory.go +++ b/pkg/store/cache/inmemory.go @@ -7,6 +7,7 @@ import ( "context" "reflect" "sync" + "time" "unsafe" "github.com/go-kit/log" @@ -302,6 +303,9 @@ func (c *InMemoryIndexCache) StorePostings(blockID ulid.ULID, l labels.Label, v // FetchMultiPostings fetches multiple postings - each identified by a label - // and returns a map containing cache hits, along with a list of missing keys. func (c *InMemoryIndexCache) FetchMultiPostings(_ context.Context, blockID ulid.ULID, keys []labels.Label) (hits map[labels.Label][]byte, misses []labels.Label) { + begin := time.Now() + defer c.commonMetrics.fetchLatency.WithLabelValues(cacheTypePostings).Observe(float64(time.Since(begin))) + hits = map[labels.Label][]byte{} blockIDKey := blockID.String() @@ -325,6 +329,9 @@ func (c *InMemoryIndexCache) StoreExpandedPostings(blockID ulid.ULID, matchers [ // FetchExpandedPostings fetches expanded postings and returns cached data and a boolean value representing whether it is a cache hit or not. func (c *InMemoryIndexCache) FetchExpandedPostings(_ context.Context, blockID ulid.ULID, matchers []*labels.Matcher) ([]byte, bool) { + begin := time.Now() + defer c.commonMetrics.fetchLatency.WithLabelValues(cacheTypeExpandedPostings).Observe(float64(time.Since(begin))) + if b, ok := c.get(cacheTypeExpandedPostings, cacheKey{blockID.String(), cacheKeyExpandedPostings(labelMatchersToString(matchers)), ""}); ok { return b, true } @@ -341,6 +348,9 @@ func (c *InMemoryIndexCache) StoreSeries(blockID ulid.ULID, id storage.SeriesRef // FetchMultiSeries fetches multiple series - each identified by ID - from the cache // and returns a map containing cache hits, along with a list of missing IDs. func (c *InMemoryIndexCache) FetchMultiSeries(_ context.Context, blockID ulid.ULID, ids []storage.SeriesRef) (hits map[storage.SeriesRef][]byte, misses []storage.SeriesRef) { + begin := time.Now() + defer c.commonMetrics.fetchLatency.WithLabelValues(cacheTypeSeries).Observe(float64(time.Since(begin))) + hits = map[storage.SeriesRef][]byte{} blockIDKey := blockID.String() diff --git a/pkg/store/cache/memcached.go b/pkg/store/cache/memcached.go index 9292f3ed59..106a612bbe 100644 --- a/pkg/store/cache/memcached.go +++ b/pkg/store/cache/memcached.go @@ -33,15 +33,18 @@ type RemoteIndexCache struct { compressionScheme string // Metrics. - postingRequests prometheus.Counter - seriesRequests prometheus.Counter - expandedPostingRequests prometheus.Counter - postingHits prometheus.Counter - seriesHits prometheus.Counter - expandedPostingHits prometheus.Counter - postingDataSizeBytes prometheus.Observer - expandedPostingDataSizeBytes prometheus.Observer - seriesDataSizeBytes prometheus.Observer + postingRequests prometheus.Counter + seriesRequests prometheus.Counter + expandedPostingRequests prometheus.Counter + postingHits prometheus.Counter + seriesHits prometheus.Counter + expandedPostingHits prometheus.Counter + postingDataSizeBytes prometheus.Observer + expandedPostingDataSizeBytes prometheus.Observer + seriesDataSizeBytes prometheus.Observer + postingsFetchDuration prometheus.Observer + expandedPostingsFetchDuration prometheus.Observer + seriesFetchDuration prometheus.Observer } // NewRemoteIndexCache makes a new RemoteIndexCache. @@ -68,6 +71,10 @@ func NewRemoteIndexCache(logger log.Logger, cacheClient cacheutil.RemoteCacheCli c.seriesDataSizeBytes = commonMetrics.dataSizeBytes.WithLabelValues(cacheTypeSeries) c.expandedPostingDataSizeBytes = commonMetrics.dataSizeBytes.WithLabelValues(cacheTypeExpandedPostings) + c.postingsFetchDuration = commonMetrics.fetchLatency.WithLabelValues(cacheTypePostings) + c.seriesFetchDuration = commonMetrics.fetchLatency.WithLabelValues(cacheTypeSeries) + c.expandedPostingsFetchDuration = commonMetrics.fetchLatency.WithLabelValues(cacheTypeExpandedPostings) + level.Info(logger).Log("msg", "created index cache") return c, nil @@ -88,6 +95,9 @@ func (c *RemoteIndexCache) StorePostings(blockID ulid.ULID, l labels.Label, v [] // and returns a map containing cache hits, along with a list of missing keys. // In case of error, it logs and return an empty cache hits map. func (c *RemoteIndexCache) FetchMultiPostings(ctx context.Context, blockID ulid.ULID, lbls []labels.Label) (hits map[labels.Label][]byte, misses []labels.Label) { + begin := time.Now() + defer c.postingsFetchDuration.Observe(float64(time.Since(begin))) + keys := make([]string, 0, len(lbls)) blockIDKey := blockID.String() @@ -138,6 +148,9 @@ func (c *RemoteIndexCache) StoreExpandedPostings(blockID ulid.ULID, keys []*labe // and returns a map containing cache hits, along with a list of missing keys. // In case of error, it logs and return an empty cache hits map. func (c *RemoteIndexCache) FetchExpandedPostings(ctx context.Context, blockID ulid.ULID, lbls []*labels.Matcher) ([]byte, bool) { + begin := time.Now() + defer c.postingsFetchDuration.Observe(float64(time.Since(begin))) + key := cacheKey{blockID.String(), cacheKeyExpandedPostings(labelMatchersToString(lbls)), c.compressionScheme}.string() // Fetch the keys from memcached in a single request. @@ -169,6 +182,9 @@ func (c *RemoteIndexCache) StoreSeries(blockID ulid.ULID, id storage.SeriesRef, // and returns a map containing cache hits, along with a list of missing IDs. // In case of error, it logs and return an empty cache hits map. func (c *RemoteIndexCache) FetchMultiSeries(ctx context.Context, blockID ulid.ULID, ids []storage.SeriesRef) (hits map[storage.SeriesRef][]byte, misses []storage.SeriesRef) { + begin := time.Now() + defer c.seriesFetchDuration.Observe(float64(time.Since(begin))) + keys := make([]string, 0, len(ids)) blockIDKey := blockID.String() From d99d7f372810acddea93fccc9cd482a91b99d322 Mon Sep 17 00:00:00 2001 From: Ben Ye Date: Mon, 25 Sep 2023 00:48:49 -0700 Subject: [PATCH 2/3] update changelog Signed-off-by: Ben Ye --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 65077caf00..5aa65ef482 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,6 +16,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re - [#6605](https://github.com/thanos-io/thanos/pull/6605) Query Frontend: Support vertical sharding binary expression with metric name when no matching labels specified. - [#6308](https://github.com/thanos-io/thanos/pull/6308) Ruler: Support configuration flag that allows customizing template for alert message. +- [#6749](https://github.com/thanos-io/thanos/pull/6308) Store Gateway: Added `thanos_store_index_cache_fetch_duration_seconds` histogram for tracking latency of fetching data from index cache. ### Changed From 1fec2db4cd4aafef84d958c10102c3ccaf6876c6 Mon Sep 17 00:00:00 2001 From: Ben Ye Date: Mon, 25 Sep 2023 08:58:06 -0700 Subject: [PATCH 3/3] use timer Signed-off-by: Ben Ye --- pkg/store/cache/inmemory.go | 13 ++++++------- pkg/store/cache/memcached.go | 12 ++++++------ 2 files changed, 12 insertions(+), 13 deletions(-) diff --git a/pkg/store/cache/inmemory.go b/pkg/store/cache/inmemory.go index dd479fd053..e0077acc35 100644 --- a/pkg/store/cache/inmemory.go +++ b/pkg/store/cache/inmemory.go @@ -7,7 +7,6 @@ import ( "context" "reflect" "sync" - "time" "unsafe" "github.com/go-kit/log" @@ -303,8 +302,8 @@ func (c *InMemoryIndexCache) StorePostings(blockID ulid.ULID, l labels.Label, v // FetchMultiPostings fetches multiple postings - each identified by a label - // and returns a map containing cache hits, along with a list of missing keys. func (c *InMemoryIndexCache) FetchMultiPostings(_ context.Context, blockID ulid.ULID, keys []labels.Label) (hits map[labels.Label][]byte, misses []labels.Label) { - begin := time.Now() - defer c.commonMetrics.fetchLatency.WithLabelValues(cacheTypePostings).Observe(float64(time.Since(begin))) + timer := prometheus.NewTimer(c.commonMetrics.fetchLatency.WithLabelValues(cacheTypePostings)) + defer timer.ObserveDuration() hits = map[labels.Label][]byte{} @@ -329,8 +328,8 @@ func (c *InMemoryIndexCache) StoreExpandedPostings(blockID ulid.ULID, matchers [ // FetchExpandedPostings fetches expanded postings and returns cached data and a boolean value representing whether it is a cache hit or not. func (c *InMemoryIndexCache) FetchExpandedPostings(_ context.Context, blockID ulid.ULID, matchers []*labels.Matcher) ([]byte, bool) { - begin := time.Now() - defer c.commonMetrics.fetchLatency.WithLabelValues(cacheTypeExpandedPostings).Observe(float64(time.Since(begin))) + timer := prometheus.NewTimer(c.commonMetrics.fetchLatency.WithLabelValues(cacheTypeExpandedPostings)) + defer timer.ObserveDuration() if b, ok := c.get(cacheTypeExpandedPostings, cacheKey{blockID.String(), cacheKeyExpandedPostings(labelMatchersToString(matchers)), ""}); ok { return b, true @@ -348,8 +347,8 @@ func (c *InMemoryIndexCache) StoreSeries(blockID ulid.ULID, id storage.SeriesRef // FetchMultiSeries fetches multiple series - each identified by ID - from the cache // and returns a map containing cache hits, along with a list of missing IDs. func (c *InMemoryIndexCache) FetchMultiSeries(_ context.Context, blockID ulid.ULID, ids []storage.SeriesRef) (hits map[storage.SeriesRef][]byte, misses []storage.SeriesRef) { - begin := time.Now() - defer c.commonMetrics.fetchLatency.WithLabelValues(cacheTypeSeries).Observe(float64(time.Since(begin))) + timer := prometheus.NewTimer(c.commonMetrics.fetchLatency.WithLabelValues(cacheTypeSeries)) + defer timer.ObserveDuration() hits = map[storage.SeriesRef][]byte{} diff --git a/pkg/store/cache/memcached.go b/pkg/store/cache/memcached.go index 106a612bbe..a3dbce9940 100644 --- a/pkg/store/cache/memcached.go +++ b/pkg/store/cache/memcached.go @@ -95,8 +95,8 @@ func (c *RemoteIndexCache) StorePostings(blockID ulid.ULID, l labels.Label, v [] // and returns a map containing cache hits, along with a list of missing keys. // In case of error, it logs and return an empty cache hits map. func (c *RemoteIndexCache) FetchMultiPostings(ctx context.Context, blockID ulid.ULID, lbls []labels.Label) (hits map[labels.Label][]byte, misses []labels.Label) { - begin := time.Now() - defer c.postingsFetchDuration.Observe(float64(time.Since(begin))) + timer := prometheus.NewTimer(c.postingsFetchDuration) + defer timer.ObserveDuration() keys := make([]string, 0, len(lbls)) @@ -148,8 +148,8 @@ func (c *RemoteIndexCache) StoreExpandedPostings(blockID ulid.ULID, keys []*labe // and returns a map containing cache hits, along with a list of missing keys. // In case of error, it logs and return an empty cache hits map. func (c *RemoteIndexCache) FetchExpandedPostings(ctx context.Context, blockID ulid.ULID, lbls []*labels.Matcher) ([]byte, bool) { - begin := time.Now() - defer c.postingsFetchDuration.Observe(float64(time.Since(begin))) + timer := prometheus.NewTimer(c.postingsFetchDuration) + defer timer.ObserveDuration() key := cacheKey{blockID.String(), cacheKeyExpandedPostings(labelMatchersToString(lbls)), c.compressionScheme}.string() @@ -182,8 +182,8 @@ func (c *RemoteIndexCache) StoreSeries(blockID ulid.ULID, id storage.SeriesRef, // and returns a map containing cache hits, along with a list of missing IDs. // In case of error, it logs and return an empty cache hits map. func (c *RemoteIndexCache) FetchMultiSeries(ctx context.Context, blockID ulid.ULID, ids []storage.SeriesRef) (hits map[storage.SeriesRef][]byte, misses []storage.SeriesRef) { - begin := time.Now() - defer c.seriesFetchDuration.Observe(float64(time.Since(begin))) + timer := prometheus.NewTimer(c.postingsFetchDuration) + defer timer.ObserveDuration() keys := make([]string, 0, len(ids))