Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Auto-expire old items from FIFO cache #5148

Merged
merged 3 commits into from
Jan 31, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
* [5262](https://github.com/grafana/loki/pull/5262) **MichelHollands**: Remove the labelFilter field
* [4911](https://github.com/grafana/loki/pull/4911) **jeschkies**: Support Docker service discovery in Promtail.
* [5107](https://github.com/grafana/loki/pull/5107) **chaudum** Fix bug in fluentd plugin that caused log lines containing non UTF-8 characters to be dropped.
* [5148](https://github.com/grafana/loki/pull/5148) **chaudum** Add periodic task to prune old expired items from the FIFO cache to free up memory.
* [5187](https://github.com/grafana/loki/pull/5187) **aknuds1** Rename metric `cortex_experimental_features_in_use_total` to `loki_experimental_features_in_use_total` and metric `log_messages_total` to `loki_log_messages_total`.
* [5170](https://github.com/grafana/loki/pull/5170) **chaudum** Fix deadlock in Promtail caused when targets got removed from a target group by the discovery manager.
* [5163](https://github.com/grafana/loki/pull/5163) **chaudum** Fix regression in fluentd plugin introduced with #5107 that caused `NoMethodError` when parsing non-string values of log lines.
Expand Down
7 changes: 6 additions & 1 deletion docs/sources/configuration/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -1900,10 +1900,15 @@ fifocache:
# CLI flag: -<prefix>.fifocache.max-size-items
[max_size_items: <int> | default = 0]

# The expiry duration for the cache.
# Deprecated: The expiry duration for the cache. Use `-<prefix>.fifocache.ttl`.
# The default value of 0 disables expiration.
# CLI flag: -<prefix>.fifocache.duration
[validity: <duration>]

# The time for items to live in the cache before those items are purged.
# The value of 0 disables auto-expiration.
# CLI flag: -<prefix>.fifocache.ttl
[ttl: <duration> | default = 1h]
```

## schema_config
Expand Down
2 changes: 1 addition & 1 deletion pkg/loki/config_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -483,7 +483,7 @@ func applyFIFOCacheConfig(r *ConfigWrapper) {
// The query results fifocache is still in Cortex so we couldn't change the flag defaults
// so instead we will override them here.
r.QueryRange.ResultsCacheConfig.CacheConfig.Fifocache.MaxSizeBytes = "1GB"
r.QueryRange.ResultsCacheConfig.CacheConfig.Fifocache.Validity = 1 * time.Hour
r.QueryRange.ResultsCacheConfig.CacheConfig.Fifocache.TTL = 1 * time.Hour
}
}

Expand Down
9 changes: 4 additions & 5 deletions pkg/loki/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ import (
loki_storage "github.com/grafana/loki/pkg/storage"
"github.com/grafana/loki/pkg/storage/chunk"
"github.com/grafana/loki/pkg/storage/chunk/cache"
"github.com/grafana/loki/pkg/storage/chunk/storage"
chunk_storage "github.com/grafana/loki/pkg/storage/chunk/storage"
chunk_util "github.com/grafana/loki/pkg/storage/chunk/util"
"github.com/grafana/loki/pkg/storage/stores/shipper"
Expand Down Expand Up @@ -323,12 +322,12 @@ func (t *Loki) initTableManager() (services.Service, error) {

reg := prometheus.WrapRegistererWith(prometheus.Labels{"component": "table-manager-store"}, prometheus.DefaultRegisterer)

tableClient, err := storage.NewTableClient(lastConfig.IndexType, t.Cfg.StorageConfig.Config, reg)
tableClient, err := chunk_storage.NewTableClient(lastConfig.IndexType, t.Cfg.StorageConfig.Config, reg)
if err != nil {
return nil, err
}

bucketClient, err := storage.NewBucketClient(t.Cfg.StorageConfig.Config)
bucketClient, err := chunk_storage.NewBucketClient(t.Cfg.StorageConfig.Config)
util_log.CheckFatal("initializing bucket client", err, util_log.Logger)

t.tableManager, err = chunk.NewTableManager(t.Cfg.TableManager, t.Cfg.SchemaConfig.SchemaConfig, maxChunkAgeForTableManager, tableClient, bucketClient, nil, prometheus.DefaultRegisterer)
Expand Down Expand Up @@ -363,7 +362,7 @@ func (t *Loki) initStore() (_ services.Service, err error) {
// however it has to be deserialized to do so, setting the cache validity to some arbitrary amount less than the
// IndexCacheValidity guarantees the FIFO cache will expire the object first which can be done without
// having to deserialize the object.
Validity: t.Cfg.StorageConfig.IndexCacheValidity - 1*time.Minute,
TTL: t.Cfg.StorageConfig.IndexCacheValidity - 1*time.Minute,
},
}
// Force the retain period to be longer than the IndexCacheValidity used in the store, this guarantees we don't
Expand Down Expand Up @@ -718,7 +717,7 @@ func (t *Loki) initCompactor() (services.Service, error) {

func (t *Loki) initIndexGateway() (services.Service, error) {
t.Cfg.StorageConfig.BoltDBShipperConfig.Mode = shipper.ModeReadOnly
objectClient, err := storage.NewObjectClient(t.Cfg.StorageConfig.BoltDBShipperConfig.SharedStoreType, t.Cfg.StorageConfig.Config, t.clientMetrics)
objectClient, err := chunk_storage.NewObjectClient(t.Cfg.StorageConfig.BoltDBShipperConfig.SharedStoreType, t.Cfg.StorageConfig.Config, t.clientMetrics)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/querier/queryrange/roundtrip_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ var (
EnableFifoCache: true,
Fifocache: cache.FifoCacheConfig{
MaxSizeItems: 1024,
Validity: 24 * time.Hour,
TTL: 24 * time.Hour,
},
},
},
Expand Down
4 changes: 2 additions & 2 deletions pkg/storage/chunk/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,8 @@ func New(cfg Config, reg prometheus.Registerer, logger log.Logger) (Cache, error
caches := []Cache{}

if cfg.EnableFifoCache {
if cfg.Fifocache.Validity == 0 && cfg.DefaultValidity != 0 {
cfg.Fifocache.Validity = cfg.DefaultValidity
if cfg.Fifocache.TTL == 0 && cfg.DefaultValidity != 0 {
cfg.Fifocache.TTL = cfg.DefaultValidity
}

if cache := NewFifoCache(cfg.Prefix+"fifocache", cfg.Fifocache, reg, logger); cache != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/chunk/cache/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ func TestMemcache(t *testing.T) {
}

func TestFifoCache(t *testing.T) {
cache := cache.NewFifoCache("test", cache.FifoCacheConfig{MaxSizeItems: 1e3, Validity: 1 * time.Hour},
cache := cache.NewFifoCache("test", cache.FifoCacheConfig{MaxSizeItems: 1e3, TTL: 1 * time.Hour},
nil, log.NewNopLogger())
testCache(t, cache)
}
Expand Down
82 changes: 67 additions & 15 deletions pkg/storage/chunk/cache/fifo_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,18 +33,22 @@ const (
type FifoCacheConfig struct {
MaxSizeBytes string `yaml:"max_size_bytes"`
MaxSizeItems int `yaml:"max_size_items"`
Validity time.Duration `yaml:"validity"`
cyriltovena marked this conversation as resolved.
Show resolved Hide resolved
TTL time.Duration `yaml:"ttl"`

DeprecatedSize int `yaml:"size"`
DeprecatedValidity time.Duration `yaml:"validity"`
DeprecatedSize int `yaml:"size"`

PurgeInterval time.Duration
}

// RegisterFlagsWithPrefix adds the flags required to config this to the given FlagSet
func (cfg *FifoCacheConfig) RegisterFlagsWithPrefix(prefix, description string, f *flag.FlagSet) {
f.StringVar(&cfg.MaxSizeBytes, prefix+"fifocache.max-size-bytes", "1GB", description+"Maximum memory size of the cache in bytes. A unit suffix (KB, MB, GB) may be applied.")
f.IntVar(&cfg.MaxSizeItems, prefix+"fifocache.max-size-items", 0, description+"Maximum number of entries in the cache.")
f.DurationVar(&cfg.Validity, prefix+"fifocache.duration", time.Hour, description+"The expiry duration for the cache.")
f.DurationVar(&cfg.TTL, prefix+"fifocache.ttl", time.Hour, description+"The time to live for items in the cache before they get purged.")

f.IntVar(&cfg.DeprecatedSize, prefix+"fifocache.size", 0, "Deprecated (use max-size-items or max-size-bytes instead): "+description+"The number of entries to cache. ")
f.DurationVar(&cfg.DeprecatedValidity, prefix+"fifocache.duration", 0, "Deprecated (use ttl instead): "+description+"The expiry duration for the cache.")
f.IntVar(&cfg.DeprecatedSize, prefix+"fifocache.size", 0, "Deprecated (use max-size-items or max-size-bytes instead): "+description+"The number of entries to cache.")
}

func (cfg *FifoCacheConfig) Validate() error {
Expand All @@ -70,11 +74,12 @@ type FifoCache struct {
maxSizeItems int
maxSizeBytes uint64
currSizeBytes uint64
validity time.Duration

entries map[string]*list.Element
lru *list.List

done chan struct{}

entriesAdded prometheus.Counter
entriesAddedNew prometheus.Counter
entriesEvicted prometheus.Counter
Expand Down Expand Up @@ -107,13 +112,27 @@ func NewFifoCache(name string, cfg FifoCacheConfig, reg prometheus.Registerer, l
level.Warn(logger).Log("msg", "neither fifocache.max-size-bytes nor fifocache.max-size-items is set", "cache", name)
return nil
}
return &FifoCache{

if cfg.DeprecatedValidity > 0 {
flagext.DeprecatedFlagsUsed.Inc()
level.Warn(logger).Log("msg", "running with DEPRECATED flag fifocache.interval, use fifocache.ttl instead", "cache", name)
cfg.TTL = cfg.DeprecatedValidity
}

// Set a default interval for the ticker
// This can be overwritten to a smaller value in tests
if cfg.PurgeInterval == 0 {
cfg.PurgeInterval = 1 * time.Minute
}

cache := &FifoCache{
maxSizeItems: cfg.MaxSizeItems,
maxSizeBytes: maxSizeBytes,
validity: cfg.Validity,
entries: make(map[string]*list.Element),
lru: list.New(),

done: make(chan struct{}),

entriesAdded: promauto.With(reg).NewCounter(prometheus.CounterOpts{
Namespace: "querier",
Subsystem: "cache",
Expand Down Expand Up @@ -166,7 +185,7 @@ func NewFifoCache(name string, cfg FifoCacheConfig, reg prometheus.Registerer, l
Namespace: "querier",
Subsystem: "cache",
Name: "stale_gets_total",
Help: "The total number of Get calls that had an entry which expired",
Help: "The total number of Get calls that had an entry which expired (deprecated)",
ConstLabels: prometheus.Labels{"cache": name},
}),

Expand All @@ -178,6 +197,43 @@ func NewFifoCache(name string, cfg FifoCacheConfig, reg prometheus.Registerer, l
ConstLabels: prometheus.Labels{"cache": name},
}),
}

if cfg.TTL > 0 {
go cache.runPruneJob(cfg.PurgeInterval, cfg.TTL)
}

return cache
}

func (c *FifoCache) runPruneJob(interval, ttl time.Duration) {
ticker := time.NewTicker(interval)
defer ticker.Stop()

for {
select {
case <-c.done:
return
case <-ticker.C:
c.pruneExpiredItems(ttl)
}
}
}

// pruneExpiredItems prunes items in the cache that exceeded their ttl
func (c *FifoCache) pruneExpiredItems(ttl time.Duration) {
c.lock.Lock()
defer c.lock.Unlock()

for k, v := range c.entries {
entry := v.Value.(*cacheEntry)
if time.Since(entry.updated) > ttl {
_ = c.lru.Remove(v).(*cacheEntry)
delete(c.entries, k)
c.currSizeBytes -= sizeOf(entry)
c.entriesCurrent.Dec()
c.entriesEvicted.Inc()
}
}
}

// Fetch implements Cache.
Expand Down Expand Up @@ -214,6 +270,8 @@ func (c *FifoCache) Stop() {
c.lock.Lock()
defer c.lock.Unlock()

close(c.done)

c.entriesEvicted.Add(float64(c.lru.Len()))

c.entries = make(map[string]*list.Element)
Expand Down Expand Up @@ -285,13 +343,7 @@ func (c *FifoCache) Get(ctx context.Context, key string) ([]byte, bool) {
element, ok := c.entries[key]
if ok {
entry := element.Value.(*cacheEntry)
if c.validity == 0 || time.Since(entry.updated) < c.validity {
return entry.value, true
}

c.totalMisses.Inc()
c.staleGets.Inc()
return nil, false
return entry.value, true
}

c.totalMisses.Inc()
Expand Down
Loading