diff --git a/pkg/pattern/drain/drain.go b/pkg/pattern/drain/drain.go index ade8fca366b8..31932832f701 100644 --- a/pkg/pattern/drain/drain.go +++ b/pkg/pattern/drain/drain.go @@ -44,11 +44,11 @@ type Config struct { ParamString string } -func createLogClusterCache(maxSize int) *LogClusterCache { +func createLogClusterCache(maxSize int, onEvict func(int, *LogCluster)) *LogClusterCache { if maxSize == 0 { maxSize = math.MaxInt } - cache, _ := simplelru.NewLRU[int, *LogCluster](maxSize, nil) + cache, _ := simplelru.NewLRU[int, *LogCluster](maxSize, onEvict) return &LogClusterCache{ cache: cache, } @@ -146,16 +146,21 @@ func DefaultConfig() *Config { } } -func New(config *Config) *Drain { +func New(config *Config, metrics *Metrics) *Drain { if config.LogClusterDepth < 3 { panic("depth argument must be at least 3") } config.maxNodeDepth = config.LogClusterDepth - 2 + var evictFn func(int, *LogCluster) + if metrics != nil { + evictFn = func(int, *LogCluster) { metrics.PatternsEvictedTotal.Inc() } + } d := &Drain{ config: config, rootNode: createNode(), - idToCluster: createLogClusterCache(config.MaxClusters), + idToCluster: createLogClusterCache(config.MaxClusters, evictFn), + metrics: metrics, } return d } @@ -165,6 +170,7 @@ type Drain struct { rootNode *Node idToCluster *LogClusterCache clustersCounter int + metrics *Metrics } func (d *Drain) Clusters() []*LogCluster { @@ -195,6 +201,9 @@ func (d *Drain) train(tokens []string, stringer func([]string) string, ts int64) matchCluster.append(model.TimeFromUnixNano(ts)) d.idToCluster.Set(clusterID, matchCluster) d.addSeqToPrefixTree(d.rootNode, matchCluster) + if d.metrics != nil { + d.metrics.PatternsDetectedTotal.Inc() + } } else { newTemplateTokens := d.createTemplate(tokens, matchCluster.Tokens) matchCluster.Tokens = newTemplateTokens diff --git a/pkg/pattern/drain/drain_test.go b/pkg/pattern/drain/drain_test.go index 72b80aeb67d3..e9709aed3fec 100644 --- a/pkg/pattern/drain/drain_test.go +++ b/pkg/pattern/drain/drain_test.go @@ -20,9 +20,9 @@ func TestDrain_TrainExtractsPatterns(t *testing.T) { }{ { // High variation leads to many patterns including some that are too generic (many tokens matched) and some that are too specific (too few matchers) - name: "Generate patterns on high variation logfmt logs", - drain: New(DefaultConfig()), - inputFile: "testdata/agent-logfmt.txt", + name: `Generate patterns on high variation logfmt logs`, + drain: New(DefaultConfig(), nil), + inputFile: `testdata/agent-logfmt.txt`, patterns: []string{ "ts=2024-04-16T15:10:43.192290389Z caller=filetargetmanager.go:361 level=info component=logs logs_config=default msg=\"Adding target\" key=\"/var/log/pods/*19a1cce8-5f04-46e0-a124-292b0dd9b343/testcoordinator/*.log:{batch_kubernetes_io_controller_uid=\\\"25ec5edf-f78e-468b-b6f3-3b9685f0cc8f\\\", batch_kubernetes_io_job_name=\\\"testcoordinator-job-2665838\\\", container=\\\"testcoordinator\\\", controller_uid=\\\"25ec5edf-f78e-468b-b6f3-3b9685f0cc8f\\\", job=\\\"k6-cloud/testcoordinator\\\", job_name=\\\"testcoordinator-job-2665838\\\", name=\\\"testcoordinator\\\", namespace=\\\"k6-cloud\\\", pod=\\\"testcoordinator-job-2665838-9g8ds\\\"}\"", "<_> <_> level=info component=logs logs_config=default <_> target\" <_> <_> <_> <_> <_> <_>", @@ -42,9 +42,9 @@ func TestDrain_TrainExtractsPatterns(t *testing.T) { }, { // Lower variation leads to fewer patterns including some with limited value (single lines, no matchers) - name: "Generate patterns on low variation logfmt logs", - drain: New(DefaultConfig()), - inputFile: "testdata/ingester-logfmt.txt", + name: `Generate patterns on low variation logfmt logs`, + drain: New(DefaultConfig(), nil), + inputFile: `testdata/ingester-logfmt.txt`, patterns: []string{ "<_> caller=head.go:216 level=debug tenant=987678 msg=\"profile is empty after delta computation\" metricName=memory", "ts=2024-04-17T09:52:46.363974185Z caller=http.go:194 level=debug traceID=1b48f5156a61ca69 msg=\"GET /debug/pprof/delta_mutex (200) 1.161082ms\"", @@ -53,9 +53,9 @@ func TestDrain_TrainExtractsPatterns(t *testing.T) { }, { // Lower variation logs in json leads to a high number of patterns with very few matchers - name: "Generate patterns on json formatted logs", - drain: New(DefaultConfig()), - inputFile: "testdata/drone-json.txt", + name: `Generate patterns on json formatted logs`, + drain: New(DefaultConfig(), nil), + inputFile: `testdata/drone-json.txt`, patterns: []string{ "<_> capacity <_>", "<_> capacity changes <_>", @@ -96,7 +96,7 @@ func TestDrain_TrainExtractsPatterns(t *testing.T) { }, { name: "Patterns for distributor logs", - drain: New(DefaultConfig()), + drain: New(DefaultConfig(), nil), inputFile: "testdata/distributor-logfmt.txt", patterns: []string{ `<_> caller=http.go:194 level=debug <_> <_> msg="POST <_> <_> <_>`, @@ -104,7 +104,7 @@ func TestDrain_TrainExtractsPatterns(t *testing.T) { }, { name: "Patterns for journald logs", - drain: New(DefaultConfig()), + drain: New(DefaultConfig(), nil), inputFile: "testdata/journald.txt", patterns: []string{ "2024-05-07T11:59:43.484606Z INFO ExtHandler ExtHandler Downloading agent manifest", @@ -195,7 +195,7 @@ func TestDrain_TrainExtractsPatterns(t *testing.T) { }, { name: "Patterns for kafka logs", - drain: New(DefaultConfig()), + drain: New(DefaultConfig(), nil), inputFile: "testdata/kafka.txt", patterns: []string{ `[2024-05-07 <_> INFO [LocalLog partition=mimir-dev-09-aggregations-offsets-0, dir=/bitnami/kafka/data] Deleting segment files <_> size=948, <_> <_> (kafka.log.LocalLog$)`, @@ -219,7 +219,7 @@ func TestDrain_TrainExtractsPatterns(t *testing.T) { }, { name: "Patterns for kubernetes logs", - drain: New(DefaultConfig()), + drain: New(DefaultConfig(), nil), inputFile: "testdata/kubernetes.txt", patterns: []string{ "I0507 12:04:17.596484 1 highnodeutilization.go:107] \"Criteria for a node below target utilization\" CPU=50 Mem=50 Pods=100", @@ -252,7 +252,7 @@ func TestDrain_TrainExtractsPatterns(t *testing.T) { }, { name: "Patterns for vault logs", - drain: New(DefaultConfig()), + drain: New(DefaultConfig(), nil), inputFile: "testdata/vault.txt", patterns: []string{ "<_> [INFO] expiration: revoked lease: <_>", @@ -260,7 +260,7 @@ func TestDrain_TrainExtractsPatterns(t *testing.T) { }, { name: "Patterns for calico logs", - drain: New(DefaultConfig()), + drain: New(DefaultConfig(), nil), inputFile: "testdata/calico.txt", patterns: []string{ `2024-05-08 <_> [DEBUG][216945] felix/table.go 870: Found forward-reference <_> ipVersion=0x4 <_> <_> [0:0]" table="nat"`, @@ -383,8 +383,8 @@ func TestDrain_TrainGeneratesMatchablePatterns(t *testing.T) { inputLines []string }{ { - name: `should match each line against a pattern`, - drain: New(DefaultConfig()), + name: "should match each line against a pattern", + drain: New(DefaultConfig(), nil), inputLines: []string{ "test test test", "test test test", @@ -393,8 +393,8 @@ func TestDrain_TrainGeneratesMatchablePatterns(t *testing.T) { }, }, { - name: `should also match newlines`, - drain: New(DefaultConfig()), + name: "should also match newlines", + drain: New(DefaultConfig(), nil), inputLines: []string{ `test test test `, @@ -413,7 +413,6 @@ func TestDrain_TrainGeneratesMatchablePatterns(t *testing.T) { for _, line := range tt.inputLines { tt.drain.Train(line, 0) } - t.Log(`Learned clusters`, tt.drain.Clusters()) for _, line := range tt.inputLines { match := tt.drain.Match(line) @@ -432,8 +431,8 @@ func TestDrain_TrainGeneratesPatternsMatchableByLokiPatternFilter(t *testing.T) inputLines []string }{ { - name: `should extract patterns that all lines match`, - drain: New(DefaultConfig()), + name: "should extract patterns that all lines match", + drain: New(DefaultConfig(), nil), inputLines: []string{ "test 1 test", "test 2 test", @@ -442,8 +441,8 @@ func TestDrain_TrainGeneratesPatternsMatchableByLokiPatternFilter(t *testing.T) }, }, { - name: `should extract patterns that match if line ends with newlines`, - drain: New(DefaultConfig()), + name: "should extract patterns that match if line ends with newlines", + drain: New(DefaultConfig(), nil), inputLines: []string{ `test 1 test `, @@ -456,8 +455,8 @@ func TestDrain_TrainGeneratesPatternsMatchableByLokiPatternFilter(t *testing.T) }, }, { - name: `should extract patterns that match if line ends with empty space`, - drain: New(DefaultConfig()), + name: "should extract patterns that match if line ends with empty space", + drain: New(DefaultConfig(), nil), inputLines: []string{ `test 1 test `, `test 2 test `, @@ -466,8 +465,8 @@ func TestDrain_TrainGeneratesPatternsMatchableByLokiPatternFilter(t *testing.T) }, }, { - name: `should extract patterns that match if line starts with empty space`, - drain: New(DefaultConfig()), + name: "should extract patterns that match if line starts with empty space", + drain: New(DefaultConfig(), nil), inputLines: []string{ ` test 1 test`, ` test 2 test`, @@ -484,7 +483,6 @@ func TestDrain_TrainGeneratesPatternsMatchableByLokiPatternFilter(t *testing.T) } require.Equal(t, 1, len(tt.drain.Clusters())) cluster := tt.drain.Clusters()[0] - t.Log(`Extracted cluster: `, cluster) matcher, err := pattern.ParseLineFilter([]byte(cluster.String())) require.NoError(t, err) diff --git a/pkg/pattern/drain/metrics.go b/pkg/pattern/drain/metrics.go new file mode 100644 index 000000000000..b09ef1230127 --- /dev/null +++ b/pkg/pattern/drain/metrics.go @@ -0,0 +1,8 @@ +package drain + +import "github.com/prometheus/client_golang/prometheus" + +type Metrics struct { + PatternsEvictedTotal prometheus.Counter + PatternsDetectedTotal prometheus.Counter +} diff --git a/pkg/pattern/ingester.go b/pkg/pattern/ingester.go index af2e842c28b8..1cb91a1cda29 100644 --- a/pkg/pattern/ingester.go +++ b/pkg/pattern/ingester.go @@ -273,7 +273,7 @@ func (i *Ingester) GetOrCreateInstance(instanceID string) (*instance, error) { / inst, ok = i.instances[instanceID] if !ok { var err error - inst, err = newInstance(instanceID, i.logger) + inst, err = newInstance(instanceID, i.logger, i.metrics) if err != nil { return nil, err } diff --git a/pkg/pattern/ingester_querier.go b/pkg/pattern/ingester_querier.go index bfbaeb92aedb..13315b0a13f1 100644 --- a/pkg/pattern/ingester_querier.go +++ b/pkg/pattern/ingester_querier.go @@ -67,7 +67,7 @@ func (q *IngesterQuerier) Patterns(ctx context.Context, req *logproto.QueryPatte } func prunePatterns(resp *logproto.QueryPatternsResponse, minClusterSize int) *logproto.QueryPatternsResponse { - d := drain.New(drain.DefaultConfig()) + d := drain.New(drain.DefaultConfig(), nil) for _, p := range resp.Series { d.TrainPattern(p.Pattern, p.Samples) } diff --git a/pkg/pattern/ingester_test.go b/pkg/pattern/ingester_test.go index 16d5d0f04189..eff054b9ec04 100644 --- a/pkg/pattern/ingester_test.go +++ b/pkg/pattern/ingester_test.go @@ -18,7 +18,7 @@ import ( func TestInstancePushQuery(t *testing.T) { lbs := labels.New(labels.Label{Name: "test", Value: "test"}) - inst, err := newInstance("foo", log.NewNopLogger()) + inst, err := newInstance("foo", log.NewNopLogger(), newIngesterMetrics(nil, "test")) require.NoError(t, err) err = inst.Push(context.Background(), &push.PushRequest{ diff --git a/pkg/pattern/instance.go b/pkg/pattern/instance.go index 7ac0099edec3..f6efa7de0443 100644 --- a/pkg/pattern/instance.go +++ b/pkg/pattern/instance.go @@ -30,9 +30,10 @@ type instance struct { streams *streamsMap index *index.BitPrefixInvertedIndex logger log.Logger + metrics *ingesterMetrics } -func newInstance(instanceID string, logger log.Logger) (*instance, error) { +func newInstance(instanceID string, logger log.Logger, metrics *ingesterMetrics) (*instance, error) { index, err := index.NewBitPrefixWithShards(indexShards) if err != nil { return nil, err @@ -43,6 +44,7 @@ func newInstance(instanceID string, logger log.Logger) (*instance, error) { instanceID: instanceID, streams: newStreamsMap(), index: index, + metrics: metrics, } i.mapper = ingester.NewFPMapper(i.getLabelsFromFingerprint) return i, nil @@ -138,7 +140,7 @@ func (i *instance) createStream(_ context.Context, pushReqStream logproto.Stream } fp := i.getHashForLabels(labels) sortedLabels := i.index.Add(logproto.FromLabelsToLabelAdapters(labels), fp) - s, err := newStream(fp, sortedLabels) + s, err := newStream(fp, sortedLabels, i.metrics) if err != nil { return nil, fmt.Errorf("failed to create stream: %w", err) } diff --git a/pkg/pattern/metrics.go b/pkg/pattern/metrics.go index e4a9c146c36f..cb814d56905a 100644 --- a/pkg/pattern/metrics.go +++ b/pkg/pattern/metrics.go @@ -6,7 +6,9 @@ import ( ) type ingesterMetrics struct { - flushQueueLength prometheus.Gauge + flushQueueLength prometheus.Gauge + patternsDiscardedTotal prometheus.Counter + patternsDetectedTotal prometheus.Counter } func newIngesterMetrics(r prometheus.Registerer, metricsNamespace string) *ingesterMetrics { @@ -17,5 +19,17 @@ func newIngesterMetrics(r prometheus.Registerer, metricsNamespace string) *inges Name: "flush_queue_length", Help: "The total number of series pending in the flush queue.", }), + patternsDiscardedTotal: promauto.With(r).NewCounter(prometheus.CounterOpts{ + Namespace: metricsNamespace, + Subsystem: "pattern_ingester", + Name: "patterns_evicted_total", + Help: "The total number of patterns evicted from the LRU cache.", + }), + patternsDetectedTotal: promauto.With(r).NewCounter(prometheus.CounterOpts{ + Namespace: metricsNamespace, + Subsystem: "pattern_ingester", + Name: "patterns_detected_total", + Help: "The total number of patterns detected from incoming log lines.", + }), } } diff --git a/pkg/pattern/stream.go b/pkg/pattern/stream.go index 8321fce9f647..f3aad280250d 100644 --- a/pkg/pattern/stream.go +++ b/pkg/pattern/stream.go @@ -27,13 +27,17 @@ type stream struct { func newStream( fp model.Fingerprint, labels labels.Labels, + metrics *ingesterMetrics, ) (*stream, error) { return &stream{ fp: fp, labels: labels, labelsString: labels.String(), labelHash: labels.Hash(), - patterns: drain.New(drain.DefaultConfig()), + patterns: drain.New(drain.DefaultConfig(), &drain.Metrics{ + PatternsEvictedTotal: metrics.patternsDiscardedTotal, + PatternsDetectedTotal: metrics.patternsDetectedTotal, + }), }, nil } diff --git a/pkg/pattern/stream_test.go b/pkg/pattern/stream_test.go index cd76336b2e60..f2218816b111 100644 --- a/pkg/pattern/stream_test.go +++ b/pkg/pattern/stream_test.go @@ -16,7 +16,7 @@ import ( func TestAddStream(t *testing.T) { lbs := labels.New(labels.Label{Name: "test", Value: "test"}) - stream, err := newStream(model.Fingerprint(lbs.Hash()), lbs) + stream, err := newStream(model.Fingerprint(lbs.Hash()), lbs, newIngesterMetrics(nil, "test")) require.NoError(t, err) err = stream.Push(context.Background(), []push.Entry{ @@ -44,7 +44,7 @@ func TestAddStream(t *testing.T) { func TestPruneStream(t *testing.T) { lbs := labels.New(labels.Label{Name: "test", Value: "test"}) - stream, err := newStream(model.Fingerprint(lbs.Hash()), lbs) + stream, err := newStream(model.Fingerprint(lbs.Hash()), lbs, newIngesterMetrics(nil, "test")) require.NoError(t, err) err = stream.Push(context.Background(), []push.Entry{