Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: [k218] feat(max-allowed-line-length): add config to set max-allowed-line-length in pattern ingester #14076

Merged
merged 1 commit into from
Sep 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions docs/sources/shared/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -816,6 +816,10 @@ pattern_ingester:
# CLI flag: -pattern-ingester.connection-timeout
[connection_timeout: <duration> | default = 2s]

# The maximum length of log lines that can be used for pattern detection.
# CLI flag: -pattern-ingester.max-allowed-line-length
[max_allowed_line_length: <int> | default = 3000]

# The index_gateway block configures the Loki index gateway server, responsible
# for serving index queries without the need to constantly interact with the
# object store.
Expand Down
62 changes: 31 additions & 31 deletions pkg/pattern/drain/drain.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,15 @@ import (
)

type Config struct {
maxNodeDepth int
LogClusterDepth int
SimTh float64
MaxChildren int
ExtraDelimiters []string
MaxClusters int
ParamString string
MaxEvictionRatio float64
maxNodeDepth int
LogClusterDepth int
SimTh float64
MaxChildren int
ExtraDelimiters []string
MaxClusters int
ParamString string
MaxEvictionRatio float64
MaxAllowedLineLength int
}

func createLogClusterCache(maxSize int, onEvict func(int, *LogCluster)) *LogClusterCache {
Expand Down Expand Up @@ -125,11 +126,12 @@ func DefaultConfig() *Config {
// Both SimTh and MaxClusterDepth impact branching factor: the greater
// MaxClusterDepth and SimTh, the less the chance that there will be
// "similar" clusters, but the greater the footprint.
SimTh: 0.3,
MaxChildren: 15,
ParamString: `<_>`,
MaxClusters: 300,
MaxEvictionRatio: 0.25,
SimTh: 0.3,
MaxChildren: 15,
ParamString: `<_>`,
MaxClusters: 300,
MaxEvictionRatio: 0.25,
MaxAllowedLineLength: 3000,
}
}

Expand All @@ -140,11 +142,10 @@ func New(config *Config, format string, metrics *Metrics) *Drain {
config.maxNodeDepth = config.LogClusterDepth - 2

d := &Drain{
config: config,
rootNode: createNode(),
metrics: metrics,
maxAllowedLineLength: 3000,
format: format,
config: config,
rootNode: createNode(),
metrics: metrics,
format: format,
}

limiter := newLimiter(config.MaxEvictionRatio)
Expand Down Expand Up @@ -180,18 +181,17 @@ func New(config *Config, format string, metrics *Metrics) *Drain {
}

type Drain struct {
config *Config
rootNode *Node
idToCluster *LogClusterCache
clustersCounter int
metrics *Metrics
tokenizer LineTokenizer
maxAllowedLineLength int
format string
tokens []string
state interface{}
limiter *limiter
pruning bool
config *Config
rootNode *Node
idToCluster *LogClusterCache
clustersCounter int
metrics *Metrics
tokenizer LineTokenizer
format string
tokens []string
state interface{}
limiter *limiter
pruning bool
}

func (d *Drain) Clusters() []*LogCluster {
Expand All @@ -206,7 +206,7 @@ func (d *Drain) Train(content string, ts int64) *LogCluster {
if !d.limiter.Allow() {
return nil
}
if len(content) > d.maxAllowedLineLength {
if len(content) > d.config.MaxAllowedLineLength {
return nil
}
d.tokens, d.state = d.tokenizer.Tokenize(content, d.tokens, d.state)
Expand Down
27 changes: 17 additions & 10 deletions pkg/pattern/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,16 +33,17 @@ import (
const readBatchSize = 1024

type Config struct {
Enabled bool `yaml:"enabled,omitempty" doc:"description=Whether the pattern ingester is enabled."`
LifecyclerConfig ring.LifecyclerConfig `yaml:"lifecycler,omitempty" doc:"description=Configures how the lifecycle of the pattern ingester will operate and where it will register for discovery."`
ClientConfig clientpool.Config `yaml:"client_config,omitempty" doc:"description=Configures how the pattern ingester will connect to the ingesters."`
ConcurrentFlushes int `yaml:"concurrent_flushes"`
FlushCheckPeriod time.Duration `yaml:"flush_check_period"`
MaxClusters int `yaml:"max_clusters,omitempty" doc:"description=The maximum number of detected pattern clusters that can be created by streams."`
MaxEvictionRatio float64 `yaml:"max_eviction_ratio,omitempty" doc:"description=The maximum eviction ratio of patterns per stream. Once that ratio is reached, the stream will throttled pattern detection."`
MetricAggregation aggregation.Config `yaml:"metric_aggregation,omitempty" doc:"description=Configures the metric aggregation and storage behavior of the pattern ingester."`
TeeConfig TeeConfig `yaml:"tee_config,omitempty" doc:"description=Configures the pattern tee which forwards requests to the pattern ingester."`
ConnectionTimeout time.Duration `yaml:"connection_timeout"`
Enabled bool `yaml:"enabled,omitempty" doc:"description=Whether the pattern ingester is enabled."`
LifecyclerConfig ring.LifecyclerConfig `yaml:"lifecycler,omitempty" doc:"description=Configures how the lifecycle of the pattern ingester will operate and where it will register for discovery."`
ClientConfig clientpool.Config `yaml:"client_config,omitempty" doc:"description=Configures how the pattern ingester will connect to the ingesters."`
ConcurrentFlushes int `yaml:"concurrent_flushes"`
FlushCheckPeriod time.Duration `yaml:"flush_check_period"`
MaxClusters int `yaml:"max_clusters,omitempty" doc:"description=The maximum number of detected pattern clusters that can be created by streams."`
MaxEvictionRatio float64 `yaml:"max_eviction_ratio,omitempty" doc:"description=The maximum eviction ratio of patterns per stream. Once that ratio is reached, the stream will throttled pattern detection."`
MetricAggregation aggregation.Config `yaml:"metric_aggregation,omitempty" doc:"description=Configures the metric aggregation and storage behavior of the pattern ingester."`
TeeConfig TeeConfig `yaml:"tee_config,omitempty" doc:"description=Configures the pattern tee which forwards requests to the pattern ingester."`
ConnectionTimeout time.Duration `yaml:"connection_timeout"`
MaxAllowedLineLength int `yaml:"max_allowed_line_length,omitempty" doc:"description=The maximum length of log lines that can be used for pattern detection."`

// For testing.
factory ring_client.PoolFactory `yaml:"-"`
Expand Down Expand Up @@ -91,6 +92,12 @@ func (cfg *Config) RegisterFlags(fs *flag.FlagSet) {
2*time.Second,
"Timeout for connections between the Loki and the pattern ingester.",
)
fs.IntVar(
&cfg.MaxAllowedLineLength,
"pattern-ingester.max-allowed-line-length",
drain.DefaultConfig().MaxAllowedLineLength,
"The maximum length of log lines that can be used for pattern detection.",
)
}

type TeeConfig struct {
Expand Down
Loading