Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Pattern ingesters add a limiter for high eviction rate #13464

Merged
merged 4 commits into from
Jul 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion docs/sources/shared/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -583,7 +583,17 @@ pattern_ingester:
# first flush check is delayed by a random time up to 0.8x the flush check
# period. Additionally, there is +/- 1% jitter added to the interval.
# CLI flag: -pattern-ingester.flush-check-period
[flush_check_period: <duration> | default = 30s]
[flush_check_period: <duration> | default = 1m]

# The maximum number of detected pattern clusters that can be created by
# streams.
# CLI flag: -pattern-ingester.max-clusters
[max_clusters: <int> | default = 300]

# The maximum eviction ratio of patterns per stream. Once that ratio is
# reached, the stream will throttled pattern detection.
# CLI flag: -pattern-ingester.max-eviction-ratio
[max_eviction_ratio: <float> | default = 0.25]

# Configures the metric aggregation and storage behavior of the pattern
# ingester.
Expand Down
2 changes: 1 addition & 1 deletion pkg/pattern/chunk/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (

const (
TimeResolution = model.Time(int64(time.Second*10) / 1e6)
MaxChunkTime = 1 * time.Hour
MaxChunkTime = 15 * time.Minute
)

func TruncateTimestamp(ts, step model.Time) model.Time { return ts - ts%step }
86 changes: 45 additions & 41 deletions pkg/pattern/drain/drain.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,14 @@ import (
)

type Config struct {
maxNodeDepth int
LogClusterDepth int
SimTh float64
MaxChildren int
ExtraDelimiters []string
MaxClusters int
ParamString string
maxNodeDepth int
LogClusterDepth int
SimTh float64
MaxChildren int
ExtraDelimiters []string
MaxClusters int
ParamString string
MaxEvictionRatio float64
}

func createLogClusterCache(maxSize int, onEvict func(int, *LogCluster)) *LogClusterCache {
Expand All @@ -60,29 +61,13 @@ type LogClusterCache struct {
}

func (c *LogClusterCache) Values() []*LogCluster {
values := make([]*LogCluster, 0)
for _, key := range c.cache.Keys() {
if value, ok := c.cache.Peek(key); ok {
values = append(values, value)
}
}
return values
return c.cache.Values()
}

func (c *LogClusterCache) Set(key int, cluster *LogCluster) {
c.cache.Add(key, cluster)
}

func (c *LogClusterCache) Iterate(fn func(*LogCluster) bool) {
for _, key := range c.cache.Keys() {
if value, ok := c.cache.Peek(key); ok {
if !fn(value) {
return
}
}
}
}

func (c *LogClusterCache) Get(key int) *LogCluster {
cluster, ok := c.cache.Get(key)
if !ok {
Expand Down Expand Up @@ -140,10 +125,11 @@ func DefaultConfig() *Config {
// Both SimTh and MaxClusterDepth impact branching factor: the greater
// MaxClusterDepth and SimTh, the less the chance that there will be
// "similar" clusters, but the greater the footprint.
SimTh: 0.3,
MaxChildren: 15,
ParamString: `<_>`,
MaxClusters: 300,
SimTh: 0.3,
MaxChildren: 15,
ParamString: `<_>`,
MaxClusters: 300,
MaxEvictionRatio: 0.25,
}
}

Expand All @@ -152,10 +138,17 @@ func New(config *Config, format string, metrics *Metrics) *Drain {
panic("depth argument must be at least 3")
}
config.maxNodeDepth = config.LogClusterDepth - 2
var evictFn func(int, *LogCluster)
if metrics != nil {
evictFn = func(int, *LogCluster) { metrics.PatternsEvictedTotal.Inc() }

d := &Drain{
config: config,
rootNode: createNode(),
metrics: metrics,
maxAllowedLineLength: 3000,
format: format,
}

limiter := newLimiter(config.MaxEvictionRatio)

var tokenizer LineTokenizer
switch format {
case FormatJSON:
Expand All @@ -165,16 +158,20 @@ func New(config *Config, format string, metrics *Metrics) *Drain {
default:
tokenizer = newPunctuationTokenizer()
}

d := &Drain{
config: config,
rootNode: createNode(),
idToCluster: createLogClusterCache(config.MaxClusters, evictFn),
metrics: metrics,
tokenizer: tokenizer,
maxAllowedLineLength: 3000,
format: format,
}
d.idToCluster = createLogClusterCache(config.MaxClusters, func(int, *LogCluster) {
if metrics != nil {
if d.pruning {
metrics.PatternsPrunedTotal.Inc()
} else {
metrics.PatternsEvictedTotal.Inc()
}
}
if !d.pruning {
limiter.Evict()
}
})
d.tokenizer = tokenizer
d.limiter = limiter
return d
}

Expand All @@ -189,6 +186,8 @@ type Drain struct {
format string
tokens []string
state interface{}
limiter *limiter
pruning bool
}

func (d *Drain) Clusters() []*LogCluster {
Expand All @@ -200,6 +199,9 @@ func (d *Drain) TrainTokens(tokens []string, stringer func([]string) string, ts
}

func (d *Drain) Train(content string, ts int64) *LogCluster {
if !d.limiter.Allow() {
return nil
}
if len(content) > d.maxAllowedLineLength {
return nil
}
Expand Down Expand Up @@ -325,7 +327,9 @@ func (d *Drain) pruneTree(node *Node) int {
}

func (d *Drain) Delete(cluster *LogCluster) {
d.pruning = true
d.idToCluster.cache.Remove(cluster.id)
d.pruning = false
}

func (d *Drain) treeSearch(rootNode *Node, tokens []string, simTh float64, includeParams bool) *LogCluster {
Expand Down
51 changes: 51 additions & 0 deletions pkg/pattern/drain/limiter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package drain

import (
"time"
)

type limiter struct {
added int64
evicted int64
maxPercentage float64
blockedUntil time.Time
}

func newLimiter(maxPercentage float64) *limiter {
return &limiter{
maxPercentage: maxPercentage,
}
}

func (l *limiter) Allow() bool {
if !l.blockedUntil.IsZero() {
if time.Now().Before(l.blockedUntil) {
return false
}
l.reset()
}
if l.added == 0 {
l.added++
return true
}
if float64(l.evicted)/float64(l.added) > l.maxPercentage {
l.block()
return false
}
l.added++
return true
}

func (l *limiter) Evict() {
l.evicted++
}

func (l *limiter) reset() {
l.added = 0
l.evicted = 0
l.blockedUntil = time.Time{}
}

func (l *limiter) block() {
l.blockedUntil = time.Now().Add(10 * time.Minute)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you think about sampling instead of blocking completely for some time?
We could accept a small %age of the writes to keep building patterns. It might make a nicer experience for the user?

Copy link
Contributor Author

@cyriltovena cyriltovena Jul 9, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah that's good suggestion. I would like to keep it that way for now until we have all clusters under control and then improve this later.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bare in mind that it will still grow the tree if we do sampling. So I think it's a bit more risky.

In this case it will constantly lock the streams if it keeps evicting.

}
70 changes: 70 additions & 0 deletions pkg/pattern/drain/limiter_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package drain

import (
"testing"
"time"

"github.com/stretchr/testify/require"
)

func TestNewLimiter(t *testing.T) {
maxPercentage := 0.5
l := newLimiter(maxPercentage)
require.NotNil(t, l, "expected non-nil limiter")
require.Equal(t, maxPercentage, l.maxPercentage, "expected maxPercentage to match")
require.Equal(t, int64(0), l.added, "expected added to be 0")
require.Equal(t, int64(0), l.evicted, "expected evicted to be 0")
require.True(t, l.blockedUntil.IsZero(), "expected blockedUntil to be zero")
}

func TestLimiterAllow(t *testing.T) {
maxPercentage := 0.5
l := newLimiter(maxPercentage)

// Test allowing when no evictions
require.True(t, l.Allow(), "expected Allow to return true initially")

// Test allowing until evictions exceed maxPercentage
for i := 0; i < 2; i++ {
require.True(t, l.Allow(), "expected Allow to return true %d", i)
l.Evict()
}

// Evict to exceed maxPercentage
l.Evict()
require.False(t, l.Allow(), "expected Allow to return false after evictions exceed maxPercentage")

// Test blocking time
require.False(t, l.blockedUntil.IsZero(), "expected blockedUntil to be set")

// Fast forward time to simulate block duration passing
l.blockedUntil = time.Now().Add(-1 * time.Minute)
require.True(t, l.Allow(), "expected Allow to return true after block duration")
}

func TestLimiterEvict(t *testing.T) {
l := newLimiter(0.5)
l.Evict()
require.Equal(t, int64(1), l.evicted, "expected evicted to be 1")
l.Evict()
require.Equal(t, int64(2), l.evicted, "expected evicted to be 2")
}

func TestLimiterReset(t *testing.T) {
l := newLimiter(0.5)
l.added = 10
l.evicted = 5
l.blockedUntil = time.Now().Add(10 * time.Minute)
l.reset()
require.Equal(t, int64(0), l.added, "expected added to be 0")
require.Equal(t, int64(0), l.evicted, "expected evicted to be 0")
require.True(t, l.blockedUntil.IsZero(), "expected blockedUntil to be zero")
}

func TestLimiterBlock(t *testing.T) {
l := newLimiter(0.5)
l.block()
require.False(t, l.blockedUntil.IsZero(), "expected blockedUntil to be set")
require.False(t, l.Allow())
require.True(t, l.blockedUntil.After(time.Now()), "expected blockedUntil to be in the future")
}
1 change: 1 addition & 0 deletions pkg/pattern/drain/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ func DetectLogFormat(line string) string {

type Metrics struct {
PatternsEvictedTotal prometheus.Counter
PatternsPrunedTotal prometheus.Counter
PatternsDetectedTotal prometheus.Counter
TokensPerLine prometheus.Observer
StatePerLine prometheus.Observer
Expand Down
14 changes: 13 additions & 1 deletion pkg/pattern/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/grafana/loki/v3/pkg/logproto"
"github.com/grafana/loki/v3/pkg/logql/syntax"
"github.com/grafana/loki/v3/pkg/pattern/clientpool"
"github.com/grafana/loki/v3/pkg/pattern/drain"
"github.com/grafana/loki/v3/pkg/pattern/metric"
"github.com/grafana/loki/v3/pkg/util"
util_log "github.com/grafana/loki/v3/pkg/util/log"
Expand All @@ -39,6 +40,8 @@ type Config struct {
ClientConfig clientpool.Config `yaml:"client_config,omitempty" doc:"description=Configures how the pattern ingester will connect to the ingesters."`
ConcurrentFlushes int `yaml:"concurrent_flushes"`
FlushCheckPeriod time.Duration `yaml:"flush_check_period"`
MaxClusters int `yaml:"max_clusters,omitempty" doc:"description=The maximum number of detected pattern clusters that can be created by streams."`
MaxEvictionRatio float64 `yaml:"max_eviction_ratio,omitempty" doc:"description=The maximum eviction ratio of patterns per stream. Once that ratio is reached, the stream will throttled pattern detection."`

MetricAggregation metric.AggregationConfig `yaml:"metric_aggregation,omitempty" doc:"description=Configures the metric aggregation and storage behavior of the pattern ingester."`
// For testing.
Expand All @@ -53,7 +56,9 @@ func (cfg *Config) RegisterFlags(fs *flag.FlagSet) {

fs.BoolVar(&cfg.Enabled, "pattern-ingester.enabled", false, "Flag to enable or disable the usage of the pattern-ingester component.")
fs.IntVar(&cfg.ConcurrentFlushes, "pattern-ingester.concurrent-flushes", 32, "How many flushes can happen concurrently from each stream.")
fs.DurationVar(&cfg.FlushCheckPeriod, "pattern-ingester.flush-check-period", 30*time.Second, "How often should the ingester see if there are any blocks to flush. The first flush check is delayed by a random time up to 0.8x the flush check period. Additionally, there is +/- 1% jitter added to the interval.")
fs.DurationVar(&cfg.FlushCheckPeriod, "pattern-ingester.flush-check-period", 1*time.Minute, "How often should the ingester see if there are any blocks to flush. The first flush check is delayed by a random time up to 0.8x the flush check period. Additionally, there is +/- 1% jitter added to the interval.")
fs.IntVar(&cfg.MaxClusters, "pattern-ingester.max-clusters", drain.DefaultConfig().MaxClusters, "The maximum number of detected pattern clusters that can be created by the pattern ingester.")
fs.Float64Var(&cfg.MaxEvictionRatio, "pattern-ingester.max-eviction-ratio", drain.DefaultConfig().MaxEvictionRatio, "The maximum eviction ratio of patterns per stream. Once that ratio is reached, the stream will be throttled for pattern detection.")
}

func (cfg *Config) Validate() error {
Expand Down Expand Up @@ -85,6 +90,7 @@ type Ingester struct {

metrics *ingesterMetrics
chunkMetrics *metric.ChunkMetrics
drainCfg *drain.Config
}

func New(
Expand All @@ -97,6 +103,10 @@ func New(
chunkMetrics := metric.NewChunkMetrics(registerer, metricsNamespace)
registerer = prometheus.WrapRegistererWithPrefix(metricsNamespace+"_", registerer)

drainCfg := drain.DefaultConfig()
drainCfg.MaxClusters = cfg.MaxClusters
drainCfg.MaxEvictionRatio = cfg.MaxEvictionRatio

i := &Ingester{
cfg: cfg,
logger: log.With(logger, "component", "pattern-ingester"),
Expand All @@ -106,6 +116,7 @@ func New(
instances: make(map[string]*instance),
flushQueues: make([]*util.PriorityQueue, cfg.ConcurrentFlushes),
loopQuit: make(chan struct{}),
drainCfg: drainCfg,
}
i.Service = services.NewBasicService(i.starting, i.running, i.stopping)
var err error
Expand Down Expand Up @@ -357,6 +368,7 @@ func (i *Ingester) GetOrCreateInstance(instanceID string) (*instance, error) { /
i.logger,
i.metrics,
i.chunkMetrics,
i.drainCfg,
i.cfg.MetricAggregation,
)
if err != nil {
Expand Down
2 changes: 2 additions & 0 deletions pkg/pattern/ingester_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (

"github.com/grafana/loki/v3/pkg/logproto"
"github.com/grafana/loki/v3/pkg/logql/syntax"
"github.com/grafana/loki/v3/pkg/pattern/drain"
"github.com/grafana/loki/v3/pkg/pattern/iter"
"github.com/grafana/loki/v3/pkg/pattern/metric"

Expand All @@ -28,6 +29,7 @@ func setup(t *testing.T) *instance {
log.NewNopLogger(),
newIngesterMetrics(nil, "test"),
metric.NewChunkMetrics(nil, "test"),
drain.DefaultConfig(),
metric.AggregationConfig{
Enabled: true,
},
Expand Down
Loading
Loading