Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduces cache to TSDB postings #9621

Merged
merged 96 commits into from
Aug 3, 2023
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
Show all changes
96 commits
Select commit Hold shift + click to select a range
bc286f8
hacky cached postings
DylanGuedes Jun 4, 2023
da40b40
Change signatures
DylanGuedes Jun 5, 2023
2753a89
tmp inherit thanos indexcache code
DylanGuedes Jun 7, 2023
64a0af7
make caching configurable
DylanGuedes Jun 8, 2023
e37da4d
Merge branch 'main' of github.com:grafana/loki into check-index-posti…
DylanGuedes Jun 12, 2023
369869f
Implement LRU as a possible cache option
DylanGuedes Jun 12, 2023
6878e4b
Add tests
DylanGuedes Jun 12, 2023
b5be8fe
delete indexcache folder
DylanGuedes Jun 12, 2023
f795168
undo change
DylanGuedes Jun 12, 2023
e9d2650
trim down code
DylanGuedes Jun 12, 2023
1ff0663
Merge branch 'main' of github.com:grafana/loki into check-index-posti…
DylanGuedes Jun 12, 2023
3b5ab8f
Use postingsclient.
DylanGuedes Jun 12, 2023
317a879
Use right cache name.
DylanGuedes Jun 15, 2023
66c7f0d
Rename flag and remove unused var.
DylanGuedes Jun 15, 2023
71cac4e
Make the metrics more consistent.
DylanGuedes Jun 15, 2023
bf330fc
Pass canonical keys directly.
DylanGuedes Jun 15, 2023
4a4d1fd
Pass ctx directly.
DylanGuedes Jun 15, 2023
88fe691
Reuse ctx
DylanGuedes Jun 15, 2023
604de2f
Append series numbers to encoded binary
DylanGuedes Jun 15, 2023
99268de
Merge branch 'main' of github.com:grafana/loki into check-index-posti…
DylanGuedes Jun 19, 2023
4e784f3
Rename to postingsReader
DylanGuedes Jun 19, 2023
a7d48a8
lint fix
DylanGuedes Jun 19, 2023
bba1de5
Implement overflow logic
DylanGuedes Jun 19, 2023
03d0cc8
Fix tests
DylanGuedes Jun 19, 2023
a98cfea
Finish fixing tests.
DylanGuedes Jun 19, 2023
7f4e582
Fix lint
DylanGuedes Jun 20, 2023
8c71ebf
Fix lint
DylanGuedes Jun 20, 2023
64ad6ca
Rename client->reader
DylanGuedes Jun 20, 2023
e9ef21a
Update calls (haudi suggestions)
DylanGuedes Jun 20, 2023
68361f3
Merge branch 'main' of github.com:grafana/loki into check-index-posti…
DylanGuedes Jun 20, 2023
84f68df
Fix tests
DylanGuedes Jun 20, 2023
210ac07
Make sure cache is used on tests
DylanGuedes Jun 22, 2023
eef25e5
remvoe consts only used by mimir
DylanGuedes Jun 22, 2023
9b902d1
Merge branch 'main' of github.com:grafana/loki into check-index-posti…
DylanGuedes Jun 22, 2023
d27286c
appease lint (remove TSDB from struct name)
DylanGuedes Jun 22, 2023
1e6fd33
fix lint
DylanGuedes Jun 22, 2023
2030224
fix import order
DylanGuedes Jun 22, 2023
33521a1
fix import order again
DylanGuedes Jun 22, 2023
52ca301
encode with 32b instead of 64b (tsdb uses 32b internally)
DylanGuedes Jun 22, 2023
2adc870
Use "," to separate matchers.
DylanGuedes Jun 23, 2023
423692b
better defaults
DylanGuedes Jun 23, 2023
474261d
bugged linter
DylanGuedes Jun 23, 2023
2baf30d
fix docs
DylanGuedes Jun 23, 2023
110eac7
Merge branch 'main' of github.com:grafana/loki into check-index-posti…
DylanGuedes Jun 23, 2023
571c6a8
fix config type
DylanGuedes Jun 23, 2023
4139abd
Register flags
DylanGuedes Jun 23, 2023
3d30b1e
wrap error on vendor
DylanGuedes Jun 23, 2023
7d9a24b
test other thing
DylanGuedes Jun 23, 2023
7edc7bd
wrap errors
DylanGuedes Jun 23, 2023
7d7c827
Merge branch 'main' of github.com:grafana/loki into check-index-posti…
DylanGuedes Jun 23, 2023
8c9fcef
careful wrapping
DylanGuedes Jun 23, 2023
5521d82
wrap at different place
DylanGuedes Jun 23, 2023
0ef38b7
wrap different
DylanGuedes Jun 23, 2023
dcdf6d2
use sharded postings?
DylanGuedes Jun 23, 2023
6c36767
sanity check
DylanGuedes Jun 23, 2023
b84e257
reset postings by calling PostingsForMatcher again.
DylanGuedes Jun 26, 2023
5ff3a36
sanity check
DylanGuedes Jun 26, 2023
d9354f5
Try calling PostingsForMatchers after cache hit too.
DylanGuedes Jun 26, 2023
fb9b541
another sanity check
DylanGuedes Jun 26, 2023
d3a2a5b
debug decoded/encoded series
DylanGuedes Jun 26, 2023
cd0d3a9
was my decoding wrong?
DylanGuedes Jun 26, 2023
f7d8013
cleanup
DylanGuedes Jun 26, 2023
fe659c8
cleanup cached postings file.
DylanGuedes Jun 27, 2023
f07228d
revert vendor changes
DylanGuedes Jun 27, 2023
2f00fb7
Undo change to error messages
DylanGuedes Jun 27, 2023
53162c8
Add flag docs
DylanGuedes Jun 27, 2023
99a4c83
remove unnecessary test
DylanGuedes Jun 27, 2023
8eb1308
Use the checksum as part of the key.
DylanGuedes Jun 28, 2023
c3736af
Add changelog entry.
DylanGuedes Jul 2, 2023
8f556ed
Add functional test.
DylanGuedes Jul 2, 2023
dad71f9
Merge branch 'main' of github.com:grafana/loki into check-index-posti…
DylanGuedes Jul 3, 2023
18da973
Change "cache_postings" -> "enable_cache_postings"
DylanGuedes Jul 11, 2023
942d450
Apply Haudi suggestion (see https://github.com/grafana/loki/pull/9621…
DylanGuedes Jul 11, 2023
fe6c834
Merge branch 'main' of github.com:grafana/loki into check-index-posti…
DylanGuedes Jul 11, 2023
a4979c8
update flag used by e2e test
DylanGuedes Jul 11, 2023
3e0c31d
Refactor how the caching struct is passed
DylanGuedes Jul 11, 2023
fd8b411
fix lint.
DylanGuedes Jul 16, 2023
871e3e4
Use background writes for LRU cache.
DylanGuedes Jul 16, 2023
0c4e141
Add length=0 bypass.
DylanGuedes Jul 18, 2023
67852d9
Change default max item size.
DylanGuedes Jul 18, 2023
715273c
Update docs
DylanGuedes Jul 18, 2023
d2fc841
lint
DylanGuedes Jul 18, 2023
468914a
Implements snappy postings decoding/encoding
DylanGuedes Jul 20, 2023
ab45f97
fix formatting
DylanGuedes Jul 20, 2023
61e1bec
Remove LRU cache.
DylanGuedes Jul 26, 2023
ce02398
Merge branch 'main' of github.com:grafana/loki into check-index-posti…
DylanGuedes Jul 26, 2023
2be199b
fix microservices test
DylanGuedes Jul 26, 2023
07926d5
Rename enable-postings-cache flag.
DylanGuedes Jul 27, 2023
6c7433b
Merge branch 'main' of github.com:grafana/loki into check-index-posti…
DylanGuedes Jul 27, 2023
c3e6e8b
Apply suggestions from code review
DylanGuedes Jul 28, 2023
ca33e61
fix test
DylanGuedes Jul 28, 2023
bfc54c4
add description docs for tsdbshipper
DylanGuedes Jul 28, 2023
8ec02b6
Merge branch 'main' of github.com:grafana/loki into check-index-posti…
DylanGuedes Jul 28, 2023
b87c48d
Test caching behaviro on e2e test.
DylanGuedes Aug 1, 2023
ee07d76
update go.mod
DylanGuedes Aug 1, 2023
6177240
change to sorteablelabelmatchers.
DylanGuedes Aug 3, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions pkg/storage/chunk/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ type Config struct {
MemcacheClient MemcachedClientConfig `yaml:"memcached_client"`
Redis RedisConfig `yaml:"redis"`
EmbeddedCache EmbeddedCacheConfig `yaml:"embedded_cache"`
LRUCache LRUCacheConfig `yaml:"lru_cache"`
Fifocache FifoCacheConfig `yaml:"fifocache"` // deprecated

// This is to name the cache metrics properly.
Expand Down Expand Up @@ -145,6 +146,18 @@ func New(cfg Config, reg prometheus.Registerer, logger log.Logger, cacheType sta
}
}

if cfg.LRUCache.Enabled {
cache, err := NewLRUCache(cfg.Prefix+"embedded-cache", cfg.LRUCache, reg, logger, cacheType)
DylanGuedes marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
level.Error(logger).Log("msg", "failed to initialize LRU cache", "err", err)
return nil, err
}

if cache != nil {
caches = append(caches, CollectStats(Instrument(cfg.Prefix+"embedded-cache", cache, reg)))
DylanGuedes marked this conversation as resolved.
Show resolved Hide resolved
}
}

if IsMemcacheSet(cfg) && IsRedisSet(cfg) {
return nil, errors.New("use of multiple cache storage systems is not supported")
}
Expand Down
249 changes: 249 additions & 0 deletions pkg/storage/chunk/cache/lru_cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,249 @@
package cache

import (
"context"
"flag"
"fmt"
"sync"
"unsafe"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/grafana/loki/pkg/logqlmodel/stats"
util_log "github.com/grafana/loki/pkg/util/log"
lru "github.com/hashicorp/golang-lru/simplelru"
"github.com/oklog/ulid"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)

type codec string

const (
codecHeaderSnappy codec = "dvs" // As in "diff+varint+snappy".
codecHeaderSnappyWithMatchers codec = "dm" // As in "dvs+matchers"
)
DylanGuedes marked this conversation as resolved.
Show resolved Hide resolved

var DefaultLRUCacheConfig = LRUCacheConfig{
MaxSizeBytes: "250MB",
}
DylanGuedes marked this conversation as resolved.
Show resolved Hide resolved

const maxInt = int(^uint(0) >> 1)
DylanGuedes marked this conversation as resolved.
Show resolved Hide resolved

const (
stringHeaderSize = 8
sliceHeaderSize = 16
)

var ulidSize = uint64(len(ulid.ULID{}))

type LRUCacheConfig struct {
MaxSizeBytes string `yaml:"max_size_bytes"`

Enabled bool `yaml:"enabled"`
}

// RegisterFlagsWithPrefix adds the flags required to config this to the given FlagSet
func (cfg *LRUCacheConfig) RegisterFlagsWithPrefix(prefix, description string, f *flag.FlagSet) {
f.StringVar(&cfg.MaxSizeBytes, prefix+"fifocache.max-size-bytes", "500MB", description+"Maximum memory size of the cache in bytes. A unit suffix (KB, MB, GB) may be applied.")
DylanGuedes marked this conversation as resolved.
Show resolved Hide resolved
}

func (cfg *LRUCacheConfig) Validate() error {
_, err := parsebytes(cfg.MaxSizeBytes)
return err
}

type LRUCache struct {
cacheType stats.CacheType

done chan struct{}

mtx sync.Mutex

logger log.Logger
lru *lru.LRU
maxSizeBytes uint64
maxItemSizeBytes uint64

evicted *prometheus.CounterVec
requests *prometheus.CounterVec
hits *prometheus.CounterVec
totalMisses prometheus.Counter
added *prometheus.CounterVec
current *prometheus.GaugeVec
bytesInUse prometheus.Gauge
overflow *prometheus.CounterVec
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume these metrics are used by all different cache implementations. Would it make sense to move them into a separate struct so they can be re-used?

}

func NewLRUCache(name string, cfg LRUCacheConfig, reg prometheus.Registerer, logger log.Logger, cacheType stats.CacheType) (*LRUCache, error) {
util_log.WarnExperimentalUse(fmt.Sprintf("In-memory (LRU) cache - %s", name), logger)

maxSizeBytes, _ := parsebytes(cfg.MaxSizeBytes)

c := &LRUCache{
cacheType: cacheType,

maxSizeBytes: maxSizeBytes,
logger: logger,

done: make(chan struct{}),
}

c.totalMisses = promauto.With(reg).NewCounter(prometheus.CounterOpts{
Namespace: "querier",
Subsystem: "cache",
Name: "misses_total",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please check the metrics instantiation for consistent usage of Namespace, Subsystem, Name field usages.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed. I checked after your suggestion and I made it more consistent with the existing FIFO cache, but FYI: it isn't super consistent to other caches (like redis/memcached).

I also removed all references to "index" and "querier" from this implementation since this LRU cache can be used by any Loki subsystem, not necessarily the index.

Help: "The total number of Get calls that had no valid entry",
ConstLabels: prometheus.Labels{"cache": name},
})

c.bytesInUse = promauto.With(reg).NewGauge(prometheus.GaugeOpts{
Namespace: "querier",
Subsystem: "cache",
Name: "memory_bytes",
Help: "The current cache size in bytes",
ConstLabels: prometheus.Labels{"cache": name},
})

c.evicted = promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Namespace: "loki",
Name: "index_gateway_index_cache_items_evicted_total",
Help: "Total number of items that were evicted from the index cache.",
}, []string{})

c.added = promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Namespace: "loki",
Name: "index_gateway_index_cache_items_added_total",
Help: "Total number of items that were added to the index cache.",
}, []string{})

c.requests = promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Namespace: "loki",
Name: "index_gateway_index_cache_requests_total",
Help: "Total number of requests to the cache.",
}, []string{})

c.overflow = promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Namespace: "loki",
Name: "index_gateway_index_cache_items_overflowed_total",
Help: "Total number of items that could not be added to the cache due to being too big.",
}, []string{})

c.hits = promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Namespace: "loki",
Name: "index_gateway_index_cache_hits_total",
Help: "Total number of requests to the cache that were a hit.",
}, []string{})

c.current = promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{
Namespace: "loki",
Name: "index_gateway_index_cache_items",
Help: "Current number of items in the index cache.",
}, []string{})

// Initialize LRU cache with a high size limit since we will manage evictions ourselves
// based on stored size using `RemoveOldest` method.
l, err := lru.NewLRU(maxInt, c.onEvict)
if err != nil {
return nil, err
}
c.lru = l

level.Info(logger).Log(
"msg", "created in-memory index cache",
"maxItemSizeBytes", c.maxItemSizeBytes,
"maxSizeBytes", c.maxSizeBytes,
"maxItems", "maxInt",
)

return c, nil
}

// Fetch implements Cache.
func (c *LRUCache) Fetch(ctx context.Context, keys []string) (found []string, bufs [][]byte, missing []string, err error) {
found, missing, bufs = make([]string, 0, len(keys)), make([]string, 0, len(keys)), make([][]byte, 0, len(keys))
for _, key := range keys {
val, ok := c.get(key)
if !ok {
missing = append(missing, key)
continue
}

found = append(found, key)
bufs = append(bufs, val)
}
return
}

// Store implements Cache.
func (c *LRUCache) Store(ctx context.Context, keys []string, values [][]byte) error {
for i := range keys {
c.set(keys[i], values[i])
}

return nil
}

// Stop implements Cache.
func (c *LRUCache) Stop() {
c.mtx.Lock()
defer c.mtx.Unlock()

close(c.done)

c.reset()
}

func (c *LRUCache) GetCacheType() stats.CacheType {
return c.cacheType
}

func (c *LRUCache) onEvict(key, val interface{}) {
c.evicted.WithLabelValues().Inc()
c.current.WithLabelValues().Dec()
c.bytesInUse.Sub(float64(c.entryMemoryUsage(key.(string), val.([]byte))))
}

func (c *LRUCache) get(key string) ([]byte, bool) {
c.requests.WithLabelValues().Inc()

c.mtx.Lock()
defer c.mtx.Unlock()

v, ok := c.lru.Get(key)
if !ok {
c.totalMisses.Inc()
return nil, false
}
c.hits.WithLabelValues().Inc()
return v.([]byte), true
}

func (c *LRUCache) set(key string, val []byte) {
c.mtx.Lock()
defer c.mtx.Unlock()

if _, ok := c.lru.Get(key); ok {
return
}

// The caller may be passing in a sub-slice of a huge array. Copy the data
// to ensure we don't waste huge amounts of space for something small.
v := make([]byte, len(val))
copy(v, val)
c.lru.Add(key, v)

c.bytesInUse.Add(float64(c.entryMemoryUsage(key, val)))
c.added.WithLabelValues().Inc()
c.current.WithLabelValues().Inc()
}

func (c *LRUCache) entryMemoryUsage(key string, val []byte) int {
return int(unsafe.Sizeof(val)) + len(key)
}

func (c *LRUCache) reset() {
c.lru.Purge()
c.current.Reset()
c.bytesInUse.Set(0)
}
Loading