Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

embedded-cache: Bring fifocache and groupcache into single tent. #6821

Merged
merged 21 commits into from
Aug 9, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions pkg/loki/common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,10 @@ type Config struct {
// CompactorAddress is the http address of the compactor in the form http://host:port
CompactorAddress string `yaml:"compactor_address"`

// GroupCacheConfig is the configuration to use when groupcache is enabled.
// MemorycacheConfig is the configuration to use when in-memory cache is enabled.
//
// This is a common config because, when enabled, it is used across all caches
GroupCacheConfig cache.GroupCacheConfig `yaml:"groupcache"`
Memorycache cache.MemorycacheConfig `yaml:"memorycache"`
}

func (c *Config) RegisterFlags(f *flag.FlagSet) {
Expand All @@ -64,7 +64,7 @@ func (c *Config) RegisterFlags(f *flag.FlagSet) {
throwaway.Var((*flagext.StringSlice)(&c.InstanceInterfaceNames), "common.instance-interface-names", "List of network interfaces to read address from.")

// flags that only live in common
c.GroupCacheConfig.RegisterFlagsWithPrefix("common.groupcache", "", f)
c.Memorycache.RegisterFlagsWithPrefix("common.memorycache", "", f)

f.StringVar(&c.CompactorAddress, "common.compactor-address", "", "the http address of the compactor in the form http://host:port")
}
Expand Down
38 changes: 21 additions & 17 deletions pkg/loki/config_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,9 @@ func applyInstanceConfigs(r, defaults *ConfigWrapper) {
}
r.Frontend.FrontendV2.Addr = r.Common.InstanceAddr
r.IndexGateway.Ring.InstanceAddr = r.Common.InstanceAddr
r.Common.GroupCacheConfig.Ring.InstanceAddr = r.Common.InstanceAddr
if r.Common.Memorycache.Distributed {
r.Common.Memorycache.Ring.InstanceAddr = r.Common.InstanceAddr
}
}

if !reflect.DeepEqual(r.Common.InstanceInterfaceNames, defaults.Common.InstanceInterfaceNames) {
Expand All @@ -164,15 +166,17 @@ func applyInstanceConfigs(r, defaults *ConfigWrapper) {
}
r.Frontend.FrontendV2.InfNames = r.Common.InstanceInterfaceNames
r.IndexGateway.Ring.InstanceInterfaceNames = r.Common.InstanceInterfaceNames
r.Common.GroupCacheConfig.Ring.InstanceInterfaceNames = r.Common.InstanceInterfaceNames
if r.Common.Memorycache.Distributed {
r.Common.Memorycache.Ring.InstanceInterfaceNames = r.Common.InstanceInterfaceNames
}
}
}

// applyCommonCacheConfigs applies to Loki components the cache-related configurations under the common config section
// NOTE: only used for GroupCache at the moment
// TODO: apply to other caches as well
func applyCommonCacheConfigs(r, _ *ConfigWrapper) {
if r.Config.Common.GroupCacheConfig.Enabled {
if r.Config.Common.Memorycache.Enabled && r.Config.Common.Memorycache.Distributed {
r.Config.ChunkStoreConfig.ChunkCacheConfig.EnableGroupCache = true
r.Config.QueryRange.ResultsCacheConfig.CacheConfig.EnableGroupCache = true
r.Config.StorageConfig.IndexQueriesCacheConfig.EnableGroupCache = true
Expand Down Expand Up @@ -305,16 +309,16 @@ func applyConfigToRings(r, defaults *ConfigWrapper, rc util.RingConfig, mergeWit
}

// GroupCacheRing
if mergeWithExisting || reflect.DeepEqual(r.Common.GroupCacheConfig.Ring, defaults.Common.GroupCacheConfig.Ring) {
r.Common.GroupCacheConfig.Ring.HeartbeatTimeout = rc.HeartbeatTimeout
r.Common.GroupCacheConfig.Ring.HeartbeatPeriod = rc.HeartbeatPeriod
r.Common.GroupCacheConfig.Ring.InstancePort = rc.InstancePort
r.Common.GroupCacheConfig.Ring.InstanceAddr = rc.InstanceAddr
r.Common.GroupCacheConfig.Ring.InstanceID = rc.InstanceID
r.Common.GroupCacheConfig.Ring.InstanceInterfaceNames = rc.InstanceInterfaceNames
r.Common.GroupCacheConfig.Ring.InstanceZone = rc.InstanceZone
r.Common.GroupCacheConfig.Ring.ZoneAwarenessEnabled = rc.ZoneAwarenessEnabled
r.Common.GroupCacheConfig.Ring.KVStore = rc.KVStore
if mergeWithExisting || reflect.DeepEqual(r.Common.Memorycache.Ring, defaults.Common.Memorycache.Ring) {
r.Common.Memorycache.Ring.HeartbeatTimeout = rc.HeartbeatTimeout
r.Common.Memorycache.Ring.HeartbeatPeriod = rc.HeartbeatPeriod
r.Common.Memorycache.Ring.InstancePort = rc.InstancePort
r.Common.Memorycache.Ring.InstanceAddr = rc.InstanceAddr
r.Common.Memorycache.Ring.InstanceID = rc.InstanceID
r.Common.Memorycache.Ring.InstanceInterfaceNames = rc.InstanceInterfaceNames
r.Common.Memorycache.Ring.InstanceZone = rc.InstanceZone
r.Common.Memorycache.Ring.ZoneAwarenessEnabled = rc.ZoneAwarenessEnabled
r.Common.Memorycache.Ring.KVStore = rc.KVStore
}
}

Expand Down Expand Up @@ -350,7 +354,7 @@ func applyTokensFilePath(cfg *ConfigWrapper) error {
if err != nil {
return err
}
cfg.Common.GroupCacheConfig.Ring.TokensFilePath = f
cfg.Common.Memorycache.Ring.TokensFilePath = f
return nil
}

Expand Down Expand Up @@ -429,8 +433,8 @@ func appendLoopbackInterface(cfg, defaults *ConfigWrapper) {
cfg.IndexGateway.Ring.InstanceInterfaceNames = append(cfg.IndexGateway.Ring.InstanceInterfaceNames, loopbackIface)
}

if reflect.DeepEqual(cfg.Common.GroupCacheConfig.Ring.InstanceInterfaceNames, defaults.Common.GroupCacheConfig.Ring.InstanceInterfaceNames) {
cfg.Common.GroupCacheConfig.Ring.InstanceInterfaceNames = append(cfg.Common.GroupCacheConfig.Ring.InstanceInterfaceNames, loopbackIface)
if reflect.DeepEqual(cfg.Common.Memorycache.Ring.InstanceInterfaceNames, defaults.Common.Memorycache.Ring.InstanceInterfaceNames) {
cfg.Common.Memorycache.Ring.InstanceInterfaceNames = append(cfg.Common.Memorycache.Ring.InstanceInterfaceNames, loopbackIface)
}
}

Expand All @@ -445,7 +449,7 @@ func applyMemberlistConfig(r *ConfigWrapper) {
r.QueryScheduler.SchedulerRing.KVStore.Store = memberlistStr
r.CompactorConfig.CompactorRing.KVStore.Store = memberlistStr
r.IndexGateway.Ring.KVStore.Store = memberlistStr
r.Common.GroupCacheConfig.Ring.KVStore.Store = memberlistStr
r.Common.Memorycache.Ring.KVStore.Store = memberlistStr
}

var ErrTooManyStorageConfigs = errors.New("too many storage configs provided in the common config, please only define one storage backend")
Expand Down
21 changes: 15 additions & 6 deletions pkg/loki/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,20 +148,27 @@ func (t *Loki) initRing() (_ services.Service, err error) {
}

func (t *Loki) initGroupcache() (_ services.Service, err error) {
if !t.Cfg.Common.GroupCacheConfig.Enabled {
if !t.Cfg.Common.Memorycache.Enabled ||
!t.Cfg.Common.Memorycache.Distributed {
return nil, nil
}

t.Cfg.Common.GroupCacheConfig.Ring.ListenPort = t.Cfg.Common.GroupCacheConfig.ListenPort
rm, err := cache.NewGroupcacheRingManager(t.Cfg.Common.GroupCacheConfig, util_log.Logger, prometheus.DefaultRegisterer)
groupConfig := cache.GroupCacheConfig{
kavirajk marked this conversation as resolved.
Show resolved Hide resolved
Enabled: true,
Ring: t.Cfg.Common.Memorycache.Ring,
CapacityMB: t.Cfg.Common.Memorycache.MaxSizeMB,
ListenPort: t.Cfg.Common.Memorycache.ListenPort,
}

rm, err := cache.NewGroupcacheRingManager(groupConfig, util_log.Logger, prometheus.DefaultRegisterer)
if err != nil {
return nil, gerrors.Wrap(err, "new groupcache ring manager")
}

t.groupcacheRingManager = rm
t.Server.HTTP.Path("/groupcache/ring").Methods("GET", "POST").Handler(t.groupcacheRingManager)

gc, err := cache.NewGroupCache(rm, t.Cfg.Common.GroupCacheConfig, t.Server, util_log.Logger, prometheus.DefaultRegisterer)
gc, err := cache.NewGroupCache(rm, groupConfig, util_log.Logger, prometheus.DefaultRegisterer)
if err != nil {
return nil, err
}
Expand All @@ -179,7 +186,7 @@ func (t *Loki) initGroupcache() (_ services.Service, err error) {

// The index cache generates too much traffic to be used. Make it a fifo cache
t.Cfg.StorageConfig.IndexQueriesCacheConfig.EnableFifoCache = true
t.Cfg.StorageConfig.IndexQueriesCacheConfig.Fifocache.MaxSizeBytes = fmt.Sprint(t.Cfg.Common.GroupCacheConfig.CapacityMB * 1e6)
t.Cfg.StorageConfig.IndexQueriesCacheConfig.Fifocache.MaxSizeBytes = fmt.Sprint(t.Cfg.Common.Memorycache.MaxSizeMB * 1e6)

return t.groupcacheRingManager, nil
}
Expand Down Expand Up @@ -901,7 +908,9 @@ func (t *Loki) initMemberlistKV() (services.Service, error) {
t.Cfg.Ingester.LifecyclerConfig.RingConfig.KVStore.MemberlistKV = t.MemberlistKV.GetMemberlistKV
t.Cfg.QueryScheduler.SchedulerRing.KVStore.MemberlistKV = t.MemberlistKV.GetMemberlistKV
t.Cfg.Ruler.Ring.KVStore.MemberlistKV = t.MemberlistKV.GetMemberlistKV
t.Cfg.Common.GroupCacheConfig.Ring.KVStore.MemberlistKV = t.MemberlistKV.GetMemberlistKV
if t.Cfg.Common.Memorycache.Distributed {
t.Cfg.Common.Memorycache.Ring.KVStore.MemberlistKV = t.MemberlistKV.GetMemberlistKV
}

t.Server.HTTP.Handle("/memberlist", t.MemberlistKV)

Expand Down
5 changes: 3 additions & 2 deletions pkg/storage/chunk/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,11 @@ type Config struct {
Memcache MemcachedConfig `yaml:"memcached"`
MemcacheClient MemcachedClientConfig `yaml:"memcached_client"`
Redis RedisConfig `yaml:"redis"`
Fifocache FifoCacheConfig `yaml:"fifocache"`
Memorycache MemorycacheConfig `yaml:"memorycache"`
Fifocache FifoCacheConfig `yaml:"fifocache"` // depreciated
kavirajk marked this conversation as resolved.
Show resolved Hide resolved

// GroupcacheConfig is a local GroupCache config per cache
GroupCacheConfig GroupConfig `yaml:"groupcache"`
GroupCacheConfig GroupConfig `yaml:"groupcache"` // depreicated
kavirajk marked this conversation as resolved.
Show resolved Hide resolved

// This is to name the cache metrics properly.
Prefix string `yaml:"prefix" doc:"hidden"`
Expand Down
20 changes: 9 additions & 11 deletions pkg/storage/chunk/cache/groupcache.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package cache
import (
"context"
"crypto/tls"
"flag"
"fmt"
"net"
"net/http"
Expand All @@ -21,7 +20,6 @@ import (
"github.com/mailgun/groupcache/v2"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/weaveworks/common/server"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
Expand Down Expand Up @@ -74,22 +72,22 @@ type GroupConfig struct {
CapacityMB int64 `yaml:"capacity_mb,omitempty"`
}

// RegisterFlagsWithPrefix adds the flags required to config this to the given FlagSet
func (cfg *GroupCacheConfig) RegisterFlagsWithPrefix(prefix, _ string, f *flag.FlagSet) {
cfg.Ring.RegisterFlagsWithPrefix(prefix, "", f)
// // RegisterFlagsWithPrefix adds the flags required to config this to the given FlagSet
kavirajk marked this conversation as resolved.
Show resolved Hide resolved
// func (cfg *GroupCacheConfig) RegisterFlagsWithPrefix(prefix, _ string, f *flag.FlagSet) {
// cfg.Ring.RegisterFlagsWithPrefix(prefix, "", f)

f.BoolVar(&cfg.Enabled, prefix+".enabled", false, "Whether or not groupcache is enabled")
f.IntVar(&cfg.ListenPort, prefix+".listen_port", 4100, "The port to use for groupcache communication")
f.Int64Var(&cfg.CapacityMB, prefix+".capacity-per-cache-mb", 100, "Capacity of each groupcache group in MB (default: 100). "+
"NOTE: there are 3 caches (result, chunk, and index query), so the maximum used memory will be *triple* the value specified here.")
}
// f.BoolVar(&cfg.Enabled, prefix+".enabled", false, "Whether or not groupcache is enabled")
// f.IntVar(&cfg.ListenPort, prefix+".listen_port", 4100, "The port to use for groupcache communication")
// f.Int64Var(&cfg.CapacityMB, prefix+".capacity-per-cache-mb", 100, "Capacity of each groupcache group in MB (default: 100). "+
// "NOTE: there are 3 caches (result, chunk, and index query), so the maximum used memory will be *triple* the value specified here.")
// }

type ringManager interface {
Addr() string
Ring() ring.ReadRing
}

func NewGroupCache(rm ringManager, config GroupCacheConfig, server *server.Server, logger log.Logger, reg prometheus.Registerer) (*GroupCache, error) {
func NewGroupCache(rm ringManager, config GroupCacheConfig, logger log.Logger, reg prometheus.Registerer) (*GroupCache, error) {
addr := fmt.Sprintf("http://%s", rm.Addr())
level.Info(logger).Log("msg", "groupcache local address set to", "addr", addr)

Expand Down
37 changes: 37 additions & 0 deletions pkg/storage/chunk/cache/memorycache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package cache

import (
"flag"
"time"
)

const (
DefaultPurgeInterval = 1 * time.Minute
)

// MemorycacheConfig represents in-process memory cache config.
// It can also be distributed sharding keys across peers when microservice
// or SSD mode.
type MemorycacheConfig struct {
Distributed bool `yaml:"distributed,omitempty"`
Enabled bool `yaml:"enabled,omitempty"`
MaxSizeMB int64 `yaml:"max_size_mb"`
MaxItems int `yaml:"max_items"`
TTL time.Duration `yaml:"ttl"`

// PurgeInterval tell how often should we remove keys that are expired.
// by default it takes `DefaultPurgeInterval`
PurgeInterval time.Duration

// distributed cache configs. Have no meaning if `Distributed=false`.
Ring RingCfg `yaml:"ring,omitempty"`
ListenPort int `yaml:"listen_port,omitempty"`
}

func (cfg *MemorycacheConfig) RegisterFlagsWithPrefix(prefix, description string, f *flag.FlagSet) {
f.Int64Var(&cfg.MaxSizeMB, prefix+"memorycache.max-size-mb", 100, description+"Maximum memory size of the cache in MB.")
f.IntVar(&cfg.MaxItems, prefix+"memorycache.max-items", 0, description+"Maximum number of entries in the cache.")
f.DurationVar(&cfg.TTL, prefix+"memorycache.ttl", time.Hour, description+"The time to live for items in the cache before they get purged.")
cfg.Ring.RegisterFlagsWithPrefix(prefix, "", f)
f.IntVar(&cfg.ListenPort, prefix+".listen_port", 4100, "The port to use for groupcache communication")
}