Skip to content

Commit

Permalink
embedded-cache: Bring fifocache and groupcache into single tent. (g…
Browse files Browse the repository at this point in the history
…rafana#6821)

* chore(groupcache): Rename groupcache into memcache{distributed: true}

1. Introduced new config called memorycache config.
2. It has flag `distributed`. By enabling it, we use groupcache

Going forward. We will bring fifocache into memcache config(with distributed:false)

This PR is just a POC. To show how we can simplify in-built memory cache config and bring it into single config.

Signed-off-by: Kaviraj <kavirajkanagaraj@gmail.com>

* Memorycache -> EmbeddedCache

* Bring fifocache into embeddedcache tent

Signed-off-by: Kaviraj <kavirajkanagaraj@gmail.com>

* Fix edge case with enabling fifocache

Signed-off-by: Kaviraj <kavirajkanagaraj@gmail.com>

* Fix missing flags for embedded cache

Signed-off-by: Kaviraj <kavirajkanagaraj@gmail.com>

* Add changelog

Signed-off-by: Kaviraj <kavirajkanagaraj@gmail.com>

* Warning message for fifocache config

Signed-off-by: Kaviraj <kavirajkanagaraj@gmail.com>

* Make linter happy

Signed-off-by: Kaviraj <kavirajkanagaraj@gmail.com>

* Fix listenport issue

Signed-off-by: Kaviraj <kavirajkanagaraj@gmail.com>

* idk. review remarks

Signed-off-by: Kaviraj <kavirajkanagaraj@gmail.com>

* PR remarks
1. update changelog and upgrade guide
2. s/Embeddedcache/EmbeddedCache/g
3. /groupcache/ handler -> /embedded-cache/
4. some typos
5. Fix some cache prefixes

Signed-off-by: Kaviraj <kavirajkanagaraj@gmail.com>

Signed-off-by: Kaviraj <kavirajkanagaraj@gmail.com>

* Fixing corresponding tests

Signed-off-by: Kaviraj <kavirajkanagaraj@gmail.com>

* Support extra timeouts on embedded-cache

Signed-off-by: Kaviraj <kavirajkanagaraj@gmail.com>

* Remove unwanted single struct field

Signed-off-by: Kaviraj <kavirajkanagaraj@gmail.com>

* fix(stats-collector): Let cache.New() wrap stats collector for distributed cache

Signed-off-by: Kaviraj <kavirajkanagaraj@gmail.com>

* PR remarks

Signed-off-by: Kaviraj <kavirajkanagaraj@gmail.com>

* Deprecate max_size_items from fifocache

Signed-off-by: Kaviraj <kavirajkanagaraj@gmail.com>

* PR remarks about the flag description

Signed-off-by: Kaviraj <kavirajkanagaraj@gmail.com>

* Update docs/sources/upgrading/_index.md

Co-authored-by: Karen Miller <84039272+KMiller-Grafana@users.noreply.github.com>

Co-authored-by: Karen Miller <84039272+KMiller-Grafana@users.noreply.github.com>
  • Loading branch information
2 people authored and lxwzy committed Nov 7, 2022
1 parent 1769257 commit 942ca6c
Show file tree
Hide file tree
Showing 14 changed files with 217 additions and 134 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#### Loki

##### Enhancements
* [6821](https://github.com/grafana/loki/pull/6821) **kavirajk**: Introduce new cache type `embedded-cache` which is an in-process cache system that runs loki without the need for an external cache (like memcached, redis, etc). It can be run in two modes `distributed: false` (default, and same as old `fifocache`) and `distributed: true` which runs cache in distributed fashion sharding keys across peers if Loki is run in microservices or SSD mode.
* [6691](https://github.com/grafana/loki/pull/6691) **dannykopping**: Update production-ready Loki cluster in docker-compose
* [6317](https://github.com/grafana/loki/pull/6317) **dannykoping**: General: add cache usage statistics
* [6444](https://github.com/grafana/loki/pull/6444) **aminesnow** Add TLS config to query frontend.
Expand Down
8 changes: 8 additions & 0 deletions cmd/loki/loki-local-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,14 @@ common:
kvstore:
store: inmemory

query_range:
results_cache:
cache:
embedded_cache:
enabled: true
distributed: true
max_size_mb: 100

schema_config:
configs:
- from: 2020-10-24
Expand Down
12 changes: 9 additions & 3 deletions docs/sources/upgrading/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,12 @@ The output is incredibly verbose as it shows the entire internal config struct u

### Loki

#### Fifocache is deprecated

We introduced a new cache called `embedded-cache` which is an in-process cache system that make it possible to run Loki without the need for an external cache (like Memcached, Redis, etc). It can be run in two modes `distributed: false` (default, and same as old `fifocache`) and `distributed: true` which runs cache in distributed fashion sharding keys across peers if Loki is run in microservices or SSD mode.

Currently `embedded-cache` with `distributed: true` can be enabled only for results cache.

#### Evenly spread queriers across kubernetes nodes

We now evenly spread queriers across the available kubernetes nodes, but allowing more than one querier to be scheduled into the same node.
Expand Down Expand Up @@ -87,7 +93,7 @@ limits_config:
split_queries_by_interval: 10m
```
In 2.5.0 it can only be defined in the `limits_config` section, **Loki will fail to start if you do not remove the `split_queries_by_interval` config from the `query_range` section.**
In 2.5.0 it can only be defined in the `limits_config` section, **Loki will fail to start if you do not remove the `split_queries_by_interval` config from the `query_range` section.**

Additionally, it has a new default value of `30m` rather than `0`.

Expand Down Expand Up @@ -130,7 +136,7 @@ Meanwhile, the legacy format is a string in the following format:
* `parallelise_shardable_queries` under the `query_range` config now defaults to `true`.
* `split_queries_by_interval` under the `limits_config` config now defaults to `30m`, it was `0s`.
* `max_chunk_age` in the `ingester` config now defaults to `2h` previously it was `1h`.
* `query_ingesters_within` under the `querier` config now defaults to `3h`, previously it was `0s`. Any query (or subquery) that has an end time more than `3h` ago will not be sent to the ingesters, this saves work on the ingesters for data they normally don't contain. If you regularly write old data to Loki you may need to return this value to `0s` to always query ingesters.
* `query_ingesters_within` under the `querier` config now defaults to `3h`, previously it was `0s`. Any query (or subquery) that has an end time more than `3h` ago will not be sent to the ingesters, this saves work on the ingesters for data they normally don't contain. If you regularly write old data to Loki you may need to return this value to `0s` to always query ingesters.
* `max_concurrent` under the `querier` config now defaults to `10` instead of `20`.
* `match_max_concurrent` under the `frontend_worker` config now defaults to true, this supersedes the `parallelism` setting which can now be removed from your config. Controlling query parallelism of a single process can now be done with the `querier` `max_concurrent` setting.
* `flush_op_timeout` under the `ingester` configuration block now defaults to `10m`, increased from `10s`. This can help when replaying a large WAL on Loki startup, and avoid `msg="failed to flush" ... context deadline exceeded` errors.
Expand Down Expand Up @@ -378,7 +384,7 @@ server:
grpc_server_ping_without_stream_allowed: true
```
[This issue](https://github.com/grafana/loki/issues/4375) has some more information on the change.
[This issue](https://github.com/grafana/loki/issues/4375) has some more information on the change.
#### Some metric prefixes have changed from `cortex_` to `loki_`
Expand Down
14 changes: 5 additions & 9 deletions pkg/loki/common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,10 @@ package common
import (
"flag"

"github.com/grafana/loki/pkg/storage/chunk/cache"

"github.com/grafana/dskit/flagext"
"github.com/grafana/dskit/netutil"

"github.com/grafana/loki/pkg/storage/chunk/cache"
"github.com/grafana/loki/pkg/storage/chunk/client/aws"
"github.com/grafana/loki/pkg/storage/chunk/client/azure"
"github.com/grafana/loki/pkg/storage/chunk/client/baidubce"
Expand Down Expand Up @@ -46,10 +45,8 @@ type Config struct {
// CompactorAddress is the http address of the compactor in the form http://host:port
CompactorAddress string `yaml:"compactor_address"`

// GroupCacheConfig is the configuration to use when groupcache is enabled.
//
// This is a common config because, when enabled, it is used across all caches
GroupCacheConfig cache.GroupCacheConfig `yaml:"groupcache"`
// Global embedded-cache config. Independent of what type of cache, we need some singleton configs like Ring configuration when running in distributed fashion.
EmbeddedCacheConfig cache.EmbeddedCacheSingletonConfig `yaml:"embedded_cache"`
}

func (c *Config) RegisterFlags(f *flag.FlagSet) {
Expand All @@ -63,10 +60,9 @@ func (c *Config) RegisterFlags(f *flag.FlagSet) {
throwaway.StringVar(&c.InstanceAddr, "common.instance-addr", "", "Default advertised address to be used by Loki components.")
throwaway.Var((*flagext.StringSlice)(&c.InstanceInterfaceNames), "common.instance-interface-names", "List of network interfaces to read address from.")

// flags that only live in common
c.GroupCacheConfig.RegisterFlagsWithPrefix("common.groupcache", "", f)

f.StringVar(&c.CompactorAddress, "common.compactor-address", "", "the http address of the compactor in the form http://host:port")

c.EmbeddedCacheConfig.RegisterFlagsWithPrefix("common.embedded-cache", "", f)
}

type Storage struct {
Expand Down
52 changes: 22 additions & 30 deletions pkg/loki/config_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,6 @@ func (c *ConfigWrapper) ApplyDynamicConfig() cfg.Source {

applyInstanceConfigs(r, &defaults)

applyCommonCacheConfigs(r, &defaults)

applyCommonReplicationFactor(r, &defaults)

applyDynamicRingConfigs(r, &defaults)
Expand Down Expand Up @@ -166,7 +164,9 @@ func applyInstanceConfigs(r, defaults *ConfigWrapper) {
}
r.Frontend.FrontendV2.Addr = r.Common.InstanceAddr
r.IndexGateway.Ring.InstanceAddr = r.Common.InstanceAddr
r.Common.GroupCacheConfig.Ring.InstanceAddr = r.Common.InstanceAddr
if r.QueryRange.CacheConfig.EmbeddedCache.IsEnabledWithDistributed() {
r.Common.EmbeddedCacheConfig.Ring.InstanceAddr = r.Common.InstanceAddr
}
}

if !reflect.DeepEqual(r.Common.InstanceInterfaceNames, defaults.Common.InstanceInterfaceNames) {
Expand All @@ -175,18 +175,9 @@ func applyInstanceConfigs(r, defaults *ConfigWrapper) {
}
r.Frontend.FrontendV2.InfNames = r.Common.InstanceInterfaceNames
r.IndexGateway.Ring.InstanceInterfaceNames = r.Common.InstanceInterfaceNames
r.Common.GroupCacheConfig.Ring.InstanceInterfaceNames = r.Common.InstanceInterfaceNames
}
}

// applyCommonCacheConfigs applies to Loki components the cache-related configurations under the common config section
// NOTE: only used for GroupCache at the moment
// TODO: apply to other caches as well
func applyCommonCacheConfigs(r, _ *ConfigWrapper) {
if r.Config.Common.GroupCacheConfig.Enabled {
r.Config.ChunkStoreConfig.ChunkCacheConfig.EnableGroupCache = true
r.Config.QueryRange.ResultsCacheConfig.CacheConfig.EnableGroupCache = true
r.Config.StorageConfig.IndexQueriesCacheConfig.EnableGroupCache = true
if r.QueryRange.CacheConfig.EmbeddedCache.IsEnabledWithDistributed() {
r.Common.EmbeddedCacheConfig.Ring.InstanceInterfaceNames = r.Common.InstanceInterfaceNames
}
}
}

Expand Down Expand Up @@ -315,17 +306,18 @@ func applyConfigToRings(r, defaults *ConfigWrapper, rc util.RingConfig, mergeWit
r.IndexGateway.Ring.KVStore = rc.KVStore
}

// GroupCacheRing
if mergeWithExisting || reflect.DeepEqual(r.Common.GroupCacheConfig.Ring, defaults.Common.GroupCacheConfig.Ring) {
r.Common.GroupCacheConfig.Ring.HeartbeatTimeout = rc.HeartbeatTimeout
r.Common.GroupCacheConfig.Ring.HeartbeatPeriod = rc.HeartbeatPeriod
r.Common.GroupCacheConfig.Ring.InstancePort = rc.InstancePort
r.Common.GroupCacheConfig.Ring.InstanceAddr = rc.InstanceAddr
r.Common.GroupCacheConfig.Ring.InstanceID = rc.InstanceID
r.Common.GroupCacheConfig.Ring.InstanceInterfaceNames = rc.InstanceInterfaceNames
r.Common.GroupCacheConfig.Ring.InstanceZone = rc.InstanceZone
r.Common.GroupCacheConfig.Ring.ZoneAwarenessEnabled = rc.ZoneAwarenessEnabled
r.Common.GroupCacheConfig.Ring.KVStore = rc.KVStore
// EmbeddedCache distributed ring.
if r.QueryRange.CacheConfig.EmbeddedCache.IsEnabledWithDistributed() &&
(mergeWithExisting || reflect.DeepEqual(r.Common.EmbeddedCacheConfig.Ring, defaults.Common.EmbeddedCacheConfig.Ring)) {
r.Common.EmbeddedCacheConfig.Ring.HeartbeatTimeout = rc.HeartbeatTimeout
r.Common.EmbeddedCacheConfig.Ring.HeartbeatPeriod = rc.HeartbeatPeriod
r.Common.EmbeddedCacheConfig.Ring.InstancePort = rc.InstancePort
r.Common.EmbeddedCacheConfig.Ring.InstanceAddr = rc.InstanceAddr
r.Common.EmbeddedCacheConfig.Ring.InstanceID = rc.InstanceID
r.Common.EmbeddedCacheConfig.Ring.InstanceInterfaceNames = rc.InstanceInterfaceNames
r.Common.EmbeddedCacheConfig.Ring.InstanceZone = rc.InstanceZone
r.Common.EmbeddedCacheConfig.Ring.ZoneAwarenessEnabled = rc.ZoneAwarenessEnabled
r.Common.EmbeddedCacheConfig.Ring.KVStore = rc.KVStore
}
}

Expand Down Expand Up @@ -361,7 +353,7 @@ func applyTokensFilePath(cfg *ConfigWrapper) error {
if err != nil {
return err
}
cfg.Common.GroupCacheConfig.Ring.TokensFilePath = f
cfg.Common.EmbeddedCacheConfig.Ring.TokensFilePath = f
return nil
}

Expand Down Expand Up @@ -440,8 +432,8 @@ func appendLoopbackInterface(cfg, defaults *ConfigWrapper) {
cfg.IndexGateway.Ring.InstanceInterfaceNames = append(cfg.IndexGateway.Ring.InstanceInterfaceNames, loopbackIface)
}

if reflect.DeepEqual(cfg.Common.GroupCacheConfig.Ring.InstanceInterfaceNames, defaults.Common.GroupCacheConfig.Ring.InstanceInterfaceNames) {
cfg.Common.GroupCacheConfig.Ring.InstanceInterfaceNames = append(cfg.Common.GroupCacheConfig.Ring.InstanceInterfaceNames, loopbackIface)
if reflect.DeepEqual(cfg.Common.EmbeddedCacheConfig.Ring.InstanceInterfaceNames, defaults.Common.EmbeddedCacheConfig.Ring.InstanceInterfaceNames) {
cfg.Common.EmbeddedCacheConfig.Ring.InstanceInterfaceNames = append(cfg.Common.EmbeddedCacheConfig.Ring.InstanceInterfaceNames, loopbackIface)
}
}

Expand All @@ -456,7 +448,7 @@ func applyMemberlistConfig(r *ConfigWrapper) {
r.QueryScheduler.SchedulerRing.KVStore.Store = memberlistStr
r.CompactorConfig.CompactorRing.KVStore.Store = memberlistStr
r.IndexGateway.Ring.KVStore.Store = memberlistStr
r.Common.GroupCacheConfig.Ring.KVStore.Store = memberlistStr
r.Common.EmbeddedCacheConfig.Ring.KVStore.Store = memberlistStr
}

var ErrTooManyStorageConfigs = errors.New("too many storage configs provided in the common config, please only define one storage backend")
Expand Down
37 changes: 20 additions & 17 deletions pkg/loki/config_wrapper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -821,23 +821,24 @@ ingester:
})
})

t.Run("common groupcache setting is applied to chunk, index, and result caches", func(t *testing.T) {
t.Run("embedded-cache setting is applied to result caches", func(t *testing.T) {
// ensure they are all false by default
config, _, _ := configWrapperFromYAML(t, minimalConfig, nil)
assert.False(t, config.ChunkStoreConfig.ChunkCacheConfig.EnableGroupCache)
assert.False(t, config.StorageConfig.IndexQueriesCacheConfig.EnableGroupCache)
assert.False(t, config.QueryRange.ResultsCacheConfig.CacheConfig.EnableGroupCache)
assert.False(t, config.QueryRange.ResultsCacheConfig.CacheConfig.EmbeddedCache.Enabled)
assert.False(t, config.QueryRange.ResultsCacheConfig.CacheConfig.EmbeddedCache.Distributed)

configFileString := `---
common:
groupcache:
enabled: true`
query_range:
results_cache:
cache:
embedded_cache:
enabled: true
distributed: true`

config, _ = testContext(configFileString, nil)

assert.True(t, config.ChunkStoreConfig.ChunkCacheConfig.EnableGroupCache)
assert.True(t, config.StorageConfig.IndexQueriesCacheConfig.EnableGroupCache)
assert.True(t, config.QueryRange.ResultsCacheConfig.CacheConfig.EnableGroupCache)
assert.True(t, config.QueryRange.ResultsCacheConfig.CacheConfig.EmbeddedCache.Enabled)
assert.True(t, config.QueryRange.ResultsCacheConfig.CacheConfig.EmbeddedCache.Distributed)
})
}

Expand Down Expand Up @@ -867,16 +868,18 @@ chunk_store_config:
assert.False(t, config.ChunkStoreConfig.ChunkCacheConfig.EnableFifoCache)
})

t.Run("no FIFO cache enabled by default if GroupCache is set", func(t *testing.T) {
t.Run("if distributed cache is set for results cache, FIFO cache should be disabled.", func(t *testing.T) {
configFileString := `---
common:
groupcache:
enabled: true`
query_range:
results_cache:
cache:
embedded_cache:
enabled: true
distributed: true`

config, _, _ := configWrapperFromYAML(t, configFileString, nil)
assert.False(t, config.ChunkStoreConfig.ChunkCacheConfig.EnableFifoCache)
assert.False(t, config.QueryRange.ResultsCacheConfig.CacheConfig.EnableFifoCache)
assert.True(t, config.ChunkStoreConfig.ChunkCacheConfig.EnableGroupCache)
assert.True(t, config.QueryRange.CacheConfig.EmbeddedCache.IsEnabledWithDistributed())
assert.False(t, config.QueryRange.CacheConfig.EnableFifoCache)
})

t.Run("FIFO cache is enabled by default if no other cache is set", func(t *testing.T) {
Expand Down
10 changes: 5 additions & 5 deletions pkg/loki/loki.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ type Loki struct {
queryScheduler *scheduler.Scheduler
usageReport *usagestats.Reporter
indexGatewayRingManager *indexgateway.RingManager
groupcacheRingManager *cache.GroupcacheRingManager
embeddedcacheRingManager *cache.GroupcacheRingManager

clientMetrics storage.ClientMetrics
deleteClientMetrics *deletion.DeleteRequestClientMetrics
Expand Down Expand Up @@ -476,7 +476,7 @@ func (t *Loki) setupModuleManager() error {
mm.RegisterModule(RuntimeConfig, t.initRuntimeConfig, modules.UserInvisibleModule)
mm.RegisterModule(MemberlistKV, t.initMemberlistKV, modules.UserInvisibleModule)
mm.RegisterModule(Ring, t.initRing, modules.UserInvisibleModule)
mm.RegisterModule(GroupCache, t.initGroupcache, modules.UserInvisibleModule)
mm.RegisterModule(Embededcache, t.initEmbeddedCache, modules.UserInvisibleModule)
mm.RegisterModule(Overrides, t.initOverrides, modules.UserInvisibleModule)
mm.RegisterModule(OverridesExporter, t.initOverridesExporter)
mm.RegisterModule(TenantConfigs, t.initTenantConfigs, modules.UserInvisibleModule)
Expand All @@ -503,16 +503,16 @@ func (t *Loki) setupModuleManager() error {
// Add dependencies
deps := map[string][]string{
Ring: {RuntimeConfig, Server, MemberlistKV},
GroupCache: {RuntimeConfig, Server, MemberlistKV},
Embededcache: {RuntimeConfig, Server, MemberlistKV},
UsageReport: {},
Overrides: {RuntimeConfig},
OverridesExporter: {Overrides, Server},
TenantConfigs: {RuntimeConfig},
Distributor: {Ring, Server, Overrides, TenantConfigs, UsageReport},
Store: {Overrides, GroupCache, IndexGatewayRing},
Store: {Overrides, Embededcache, IndexGatewayRing},
Ingester: {Store, Server, MemberlistKV, TenantConfigs, UsageReport},
Querier: {Store, Ring, Server, IngesterQuerier, TenantConfigs, UsageReport},
QueryFrontendTripperware: {Server, GroupCache, Overrides, TenantConfigs},
QueryFrontendTripperware: {Server, Embededcache, Overrides, TenantConfigs},
QueryFrontend: {QueryFrontendTripperware, UsageReport},
QueryScheduler: {Server, Overrides, MemberlistKV, UsageReport},
Ruler: {Ring, Server, Store, RulerStorage, IngesterQuerier, Overrides, TenantConfigs, UsageReport},
Expand Down
Loading

0 comments on commit 942ca6c

Please sign in to comment.