Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add metrics for number of patterns detected & evicted #12918

Merged
merged 4 commits into from
May 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 13 additions & 4 deletions pkg/pattern/drain/drain.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,11 @@ type Config struct {
ParamString string
}

func createLogClusterCache(maxSize int) *LogClusterCache {
func createLogClusterCache(maxSize int, onEvict func(int, *LogCluster)) *LogClusterCache {
if maxSize == 0 {
maxSize = math.MaxInt
}
cache, _ := simplelru.NewLRU[int, *LogCluster](maxSize, nil)
cache, _ := simplelru.NewLRU[int, *LogCluster](maxSize, onEvict)
return &LogClusterCache{
cache: cache,
}
Expand Down Expand Up @@ -146,16 +146,21 @@ func DefaultConfig() *Config {
}
}

func New(config *Config) *Drain {
func New(config *Config, metrics *Metrics) *Drain {
benclive marked this conversation as resolved.
Show resolved Hide resolved
if config.LogClusterDepth < 3 {
panic("depth argument must be at least 3")
}
config.maxNodeDepth = config.LogClusterDepth - 2
var evictFn func(int, *LogCluster)
if metrics != nil {
evictFn = func(int, *LogCluster) { metrics.PatternsEvictedTotal.Inc() }
}

d := &Drain{
config: config,
rootNode: createNode(),
idToCluster: createLogClusterCache(config.MaxClusters),
idToCluster: createLogClusterCache(config.MaxClusters, evictFn),
metrics: metrics,
}
return d
}
Expand All @@ -165,6 +170,7 @@ type Drain struct {
rootNode *Node
idToCluster *LogClusterCache
clustersCounter int
metrics *Metrics
}

func (d *Drain) Clusters() []*LogCluster {
Expand Down Expand Up @@ -195,6 +201,9 @@ func (d *Drain) train(tokens []string, stringer func([]string) string, ts int64)
matchCluster.append(model.TimeFromUnixNano(ts))
d.idToCluster.Set(clusterID, matchCluster)
d.addSeqToPrefixTree(d.rootNode, matchCluster)
if d.metrics != nil {
d.metrics.PatternsDetectedTotal.Inc()
}
} else {
newTemplateTokens := d.createTemplate(tokens, matchCluster.Tokens)
matchCluster.Tokens = newTemplateTokens
Expand Down
56 changes: 27 additions & 29 deletions pkg/pattern/drain/drain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@ func TestDrain_TrainExtractsPatterns(t *testing.T) {
}{
{
// High variation leads to many patterns including some that are too generic (many tokens matched) and some that are too specific (too few matchers)
name: "Generate patterns on high variation logfmt logs",
drain: New(DefaultConfig()),
inputFile: "testdata/agent-logfmt.txt",
name: `Generate patterns on high variation logfmt logs`,
drain: New(DefaultConfig(), nil),
inputFile: `testdata/agent-logfmt.txt`,
patterns: []string{
"ts=2024-04-16T15:10:43.192290389Z caller=filetargetmanager.go:361 level=info component=logs logs_config=default msg=\"Adding target\" key=\"/var/log/pods/*19a1cce8-5f04-46e0-a124-292b0dd9b343/testcoordinator/*.log:{batch_kubernetes_io_controller_uid=\\\"25ec5edf-f78e-468b-b6f3-3b9685f0cc8f\\\", batch_kubernetes_io_job_name=\\\"testcoordinator-job-2665838\\\", container=\\\"testcoordinator\\\", controller_uid=\\\"25ec5edf-f78e-468b-b6f3-3b9685f0cc8f\\\", job=\\\"k6-cloud/testcoordinator\\\", job_name=\\\"testcoordinator-job-2665838\\\", name=\\\"testcoordinator\\\", namespace=\\\"k6-cloud\\\", pod=\\\"testcoordinator-job-2665838-9g8ds\\\"}\"",
"<_> <_> level=info component=logs logs_config=default <_> target\" <_> <_> <_> <_> <_> <_>",
Expand All @@ -42,9 +42,9 @@ func TestDrain_TrainExtractsPatterns(t *testing.T) {
},
{
// Lower variation leads to fewer patterns including some with limited value (single lines, no matchers)
name: "Generate patterns on low variation logfmt logs",
drain: New(DefaultConfig()),
inputFile: "testdata/ingester-logfmt.txt",
name: `Generate patterns on low variation logfmt logs`,
drain: New(DefaultConfig(), nil),
inputFile: `testdata/ingester-logfmt.txt`,
patterns: []string{
"<_> caller=head.go:216 level=debug tenant=987678 msg=\"profile is empty after delta computation\" metricName=memory",
"ts=2024-04-17T09:52:46.363974185Z caller=http.go:194 level=debug traceID=1b48f5156a61ca69 msg=\"GET /debug/pprof/delta_mutex (200) 1.161082ms\"",
Expand All @@ -53,9 +53,9 @@ func TestDrain_TrainExtractsPatterns(t *testing.T) {
},
{
// Lower variation logs in json leads to a high number of patterns with very few matchers
name: "Generate patterns on json formatted logs",
drain: New(DefaultConfig()),
inputFile: "testdata/drone-json.txt",
name: `Generate patterns on json formatted logs`,
drain: New(DefaultConfig(), nil),
inputFile: `testdata/drone-json.txt`,
patterns: []string{
"<_> capacity <_>",
"<_> capacity changes <_>",
Expand Down Expand Up @@ -96,15 +96,15 @@ func TestDrain_TrainExtractsPatterns(t *testing.T) {
},
{
name: "Patterns for distributor logs",
drain: New(DefaultConfig()),
drain: New(DefaultConfig(), nil),
inputFile: "testdata/distributor-logfmt.txt",
patterns: []string{
`<_> caller=http.go:194 level=debug <_> <_> msg="POST <_> <_> <_>`,
},
},
{
name: "Patterns for journald logs",
drain: New(DefaultConfig()),
drain: New(DefaultConfig(), nil),
inputFile: "testdata/journald.txt",
patterns: []string{
"2024-05-07T11:59:43.484606Z INFO ExtHandler ExtHandler Downloading agent manifest",
Expand Down Expand Up @@ -195,7 +195,7 @@ func TestDrain_TrainExtractsPatterns(t *testing.T) {
},
{
name: "Patterns for kafka logs",
drain: New(DefaultConfig()),
drain: New(DefaultConfig(), nil),
inputFile: "testdata/kafka.txt",
patterns: []string{
`[2024-05-07 <_> INFO [LocalLog partition=mimir-dev-09-aggregations-offsets-0, dir=/bitnami/kafka/data] Deleting segment files <_> size=948, <_> <_> (kafka.log.LocalLog$)`,
Expand All @@ -219,7 +219,7 @@ func TestDrain_TrainExtractsPatterns(t *testing.T) {
},
{
name: "Patterns for kubernetes logs",
drain: New(DefaultConfig()),
drain: New(DefaultConfig(), nil),
inputFile: "testdata/kubernetes.txt",
patterns: []string{
"I0507 12:04:17.596484 1 highnodeutilization.go:107] \"Criteria for a node below target utilization\" CPU=50 Mem=50 Pods=100",
Expand Down Expand Up @@ -252,15 +252,15 @@ func TestDrain_TrainExtractsPatterns(t *testing.T) {
},
{
name: "Patterns for vault logs",
drain: New(DefaultConfig()),
drain: New(DefaultConfig(), nil),
inputFile: "testdata/vault.txt",
patterns: []string{
"<_> [INFO] expiration: revoked lease: <_>",
},
},
{
name: "Patterns for calico logs",
drain: New(DefaultConfig()),
drain: New(DefaultConfig(), nil),
inputFile: "testdata/calico.txt",
patterns: []string{
`2024-05-08 <_> [DEBUG][216945] felix/table.go 870: Found forward-reference <_> ipVersion=0x4 <_> <_> [0:0]" table="nat"`,
Expand Down Expand Up @@ -383,8 +383,8 @@ func TestDrain_TrainGeneratesMatchablePatterns(t *testing.T) {
inputLines []string
}{
{
name: `should match each line against a pattern`,
drain: New(DefaultConfig()),
name: "should match each line against a pattern",
drain: New(DefaultConfig(), nil),
inputLines: []string{
"test test test",
"test test test",
Expand All @@ -393,8 +393,8 @@ func TestDrain_TrainGeneratesMatchablePatterns(t *testing.T) {
},
},
{
name: `should also match newlines`,
drain: New(DefaultConfig()),
name: "should also match newlines",
drain: New(DefaultConfig(), nil),
inputLines: []string{
`test test test
`,
Expand All @@ -413,7 +413,6 @@ func TestDrain_TrainGeneratesMatchablePatterns(t *testing.T) {
for _, line := range tt.inputLines {
tt.drain.Train(line, 0)
}
t.Log(`Learned clusters`, tt.drain.Clusters())

for _, line := range tt.inputLines {
match := tt.drain.Match(line)
Expand All @@ -432,8 +431,8 @@ func TestDrain_TrainGeneratesPatternsMatchableByLokiPatternFilter(t *testing.T)
inputLines []string
}{
{
name: `should extract patterns that all lines match`,
drain: New(DefaultConfig()),
name: "should extract patterns that all lines match",
drain: New(DefaultConfig(), nil),
inputLines: []string{
"test 1 test",
"test 2 test",
Expand All @@ -442,8 +441,8 @@ func TestDrain_TrainGeneratesPatternsMatchableByLokiPatternFilter(t *testing.T)
},
},
{
name: `should extract patterns that match if line ends with newlines`,
drain: New(DefaultConfig()),
name: "should extract patterns that match if line ends with newlines",
drain: New(DefaultConfig(), nil),
inputLines: []string{
`test 1 test
`,
Expand All @@ -456,8 +455,8 @@ func TestDrain_TrainGeneratesPatternsMatchableByLokiPatternFilter(t *testing.T)
},
},
{
name: `should extract patterns that match if line ends with empty space`,
drain: New(DefaultConfig()),
name: "should extract patterns that match if line ends with empty space",
drain: New(DefaultConfig(), nil),
inputLines: []string{
`test 1 test `,
`test 2 test `,
Expand All @@ -466,8 +465,8 @@ func TestDrain_TrainGeneratesPatternsMatchableByLokiPatternFilter(t *testing.T)
},
},
{
name: `should extract patterns that match if line starts with empty space`,
drain: New(DefaultConfig()),
name: "should extract patterns that match if line starts with empty space",
drain: New(DefaultConfig(), nil),
inputLines: []string{
` test 1 test`,
` test 2 test`,
Expand All @@ -484,7 +483,6 @@ func TestDrain_TrainGeneratesPatternsMatchableByLokiPatternFilter(t *testing.T)
}
require.Equal(t, 1, len(tt.drain.Clusters()))
cluster := tt.drain.Clusters()[0]
t.Log(`Extracted cluster: `, cluster)

matcher, err := pattern.ParseLineFilter([]byte(cluster.String()))
require.NoError(t, err)
Expand Down
8 changes: 8 additions & 0 deletions pkg/pattern/drain/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package drain

import "github.com/prometheus/client_golang/prometheus"

type Metrics struct {
PatternsEvictedTotal prometheus.Counter
PatternsDetectedTotal prometheus.Counter
}
2 changes: 1 addition & 1 deletion pkg/pattern/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ func (i *Ingester) GetOrCreateInstance(instanceID string) (*instance, error) { /
inst, ok = i.instances[instanceID]
if !ok {
var err error
inst, err = newInstance(instanceID, i.logger)
inst, err = newInstance(instanceID, i.logger, i.metrics)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/pattern/ingester_querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func (q *IngesterQuerier) Patterns(ctx context.Context, req *logproto.QueryPatte
}

func prunePatterns(resp *logproto.QueryPatternsResponse, minClusterSize int) *logproto.QueryPatternsResponse {
d := drain.New(drain.DefaultConfig())
d := drain.New(drain.DefaultConfig(), nil)
for _, p := range resp.Series {
d.TrainPattern(p.Pattern, p.Samples)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/pattern/ingester_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import (

func TestInstancePushQuery(t *testing.T) {
lbs := labels.New(labels.Label{Name: "test", Value: "test"})
inst, err := newInstance("foo", log.NewNopLogger())
inst, err := newInstance("foo", log.NewNopLogger(), newIngesterMetrics(nil, "test"))
require.NoError(t, err)

err = inst.Push(context.Background(), &push.PushRequest{
Expand Down
6 changes: 4 additions & 2 deletions pkg/pattern/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,10 @@ type instance struct {
streams *streamsMap
index *index.BitPrefixInvertedIndex
logger log.Logger
metrics *ingesterMetrics
}

func newInstance(instanceID string, logger log.Logger) (*instance, error) {
func newInstance(instanceID string, logger log.Logger, metrics *ingesterMetrics) (*instance, error) {
index, err := index.NewBitPrefixWithShards(indexShards)
if err != nil {
return nil, err
Expand All @@ -43,6 +44,7 @@ func newInstance(instanceID string, logger log.Logger) (*instance, error) {
instanceID: instanceID,
streams: newStreamsMap(),
index: index,
metrics: metrics,
}
i.mapper = ingester.NewFPMapper(i.getLabelsFromFingerprint)
return i, nil
Expand Down Expand Up @@ -138,7 +140,7 @@ func (i *instance) createStream(_ context.Context, pushReqStream logproto.Stream
}
fp := i.getHashForLabels(labels)
sortedLabels := i.index.Add(logproto.FromLabelsToLabelAdapters(labels), fp)
s, err := newStream(fp, sortedLabels)
s, err := newStream(fp, sortedLabels, i.metrics)
if err != nil {
return nil, fmt.Errorf("failed to create stream: %w", err)
}
Expand Down
16 changes: 15 additions & 1 deletion pkg/pattern/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@ import (
)

type ingesterMetrics struct {
flushQueueLength prometheus.Gauge
flushQueueLength prometheus.Gauge
patternsDiscardedTotal prometheus.Counter
patternsDetectedTotal prometheus.Counter
}

func newIngesterMetrics(r prometheus.Registerer, metricsNamespace string) *ingesterMetrics {
Expand All @@ -17,5 +19,17 @@ func newIngesterMetrics(r prometheus.Registerer, metricsNamespace string) *inges
Name: "flush_queue_length",
Help: "The total number of series pending in the flush queue.",
}),
patternsDiscardedTotal: promauto.With(r).NewCounter(prometheus.CounterOpts{
Namespace: metricsNamespace,
Subsystem: "pattern_ingester",
Name: "patterns_evicted_total",
Help: "The total number of patterns evicted from the LRU cache.",
}),
patternsDetectedTotal: promauto.With(r).NewCounter(prometheus.CounterOpts{
Namespace: metricsNamespace,
Subsystem: "pattern_ingester",
Name: "patterns_detected_total",
Help: "The total number of patterns detected from incoming log lines.",
}),
}
}
6 changes: 5 additions & 1 deletion pkg/pattern/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,17 @@ type stream struct {
func newStream(
fp model.Fingerprint,
labels labels.Labels,
metrics *ingesterMetrics,
) (*stream, error) {
return &stream{
fp: fp,
labels: labels,
labelsString: labels.String(),
labelHash: labels.Hash(),
patterns: drain.New(drain.DefaultConfig()),
patterns: drain.New(drain.DefaultConfig(), &drain.Metrics{
PatternsEvictedTotal: metrics.patternsDiscardedTotal,
PatternsDetectedTotal: metrics.patternsDetectedTotal,
}),
}, nil
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/pattern/stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import (

func TestAddStream(t *testing.T) {
lbs := labels.New(labels.Label{Name: "test", Value: "test"})
stream, err := newStream(model.Fingerprint(lbs.Hash()), lbs)
stream, err := newStream(model.Fingerprint(lbs.Hash()), lbs, newIngesterMetrics(nil, "test"))
require.NoError(t, err)

err = stream.Push(context.Background(), []push.Entry{
Expand Down Expand Up @@ -44,7 +44,7 @@ func TestAddStream(t *testing.T) {

func TestPruneStream(t *testing.T) {
lbs := labels.New(labels.Label{Name: "test", Value: "test"})
stream, err := newStream(model.Fingerprint(lbs.Hash()), lbs)
stream, err := newStream(model.Fingerprint(lbs.Hash()), lbs, newIngesterMetrics(nil, "test"))
require.NoError(t, err)

err = stream.Push(context.Background(), []push.Entry{
Expand Down
Loading