From 452b6422386c2249e775365bf538d06ee3accf11 Mon Sep 17 00:00:00 2001 From: Paulin Todev Date: Tue, 14 May 2024 17:44:42 +0100 Subject: [PATCH] Clean up metrics generated from logs after a config reload. Synced from: https://github.com/grafana/loki/pull/12938 --- .../loki/process/metric/metricvec.go | 6 ++++ .../loki/process/stages/decolorize.go | 5 +++ .../component/loki/process/stages/drop.go | 5 +++ .../loki/process/stages/eventlogmessage.go | 5 +++ .../loki/process/stages/extensions.go | 5 +++ .../component/loki/process/stages/geoip.go | 5 +++ .../component/loki/process/stages/json.go | 5 +++ .../component/loki/process/stages/limit.go | 5 +++ .../component/loki/process/stages/match.go | 5 +++ .../component/loki/process/stages/metric.go | 31 +++++++++++++++++-- .../loki/process/stages/metric_test.go | 7 +++++ .../loki/process/stages/multiline.go | 5 +++ .../component/loki/process/stages/pack.go | 5 +++ .../component/loki/process/stages/pipeline.go | 8 +++++ .../component/loki/process/stages/sampling.go | 5 +++ .../component/loki/process/stages/stage.go | 6 ++++ .../process/stages/structured_metadata.go | 5 +++ 17 files changed, 116 insertions(+), 2 deletions(-) diff --git a/internal/component/loki/process/metric/metricvec.go b/internal/component/loki/process/metric/metricvec.go index d1a8d6939909..d09cc53e35b6 100644 --- a/internal/component/loki/process/metric/metricvec.go +++ b/internal/component/loki/process/metric/metricvec.go @@ -86,6 +86,12 @@ func (c *metricVec) Delete(labels model.LabelSet) bool { return ok } +func (c *metricVec) DeleteAll() { + c.mtx.Lock() + defer c.mtx.Unlock() + c.metrics = map[model.Fingerprint]prometheus.Metric{} +} + // prune will remove all metrics which implement the Expirable interface and have expired // it does not take out a lock on the metrics map so whoever calls this function should do so. func (c *metricVec) prune() { diff --git a/internal/component/loki/process/stages/decolorize.go b/internal/component/loki/process/stages/decolorize.go index 26356ef1552c..33f365934b88 100644 --- a/internal/component/loki/process/stages/decolorize.go +++ b/internal/component/loki/process/stages/decolorize.go @@ -35,3 +35,8 @@ func (m *decolorizeStage) Run(in chan Entry) chan Entry { func (m *decolorizeStage) Name() string { return StageTypeDecolorize } + +// Cleanup implements Stage. +func (*decolorizeStage) Cleanup() { + // no-op +} diff --git a/internal/component/loki/process/stages/drop.go b/internal/component/loki/process/stages/drop.go index 96c212ba3c96..81ed0439f370 100644 --- a/internal/component/loki/process/stages/drop.go +++ b/internal/component/loki/process/stages/drop.go @@ -222,3 +222,8 @@ func splitSource(s string) []string { func (m *dropStage) Name() string { return StageTypeDrop } + +// Cleanup implements Stage. +func (*dropStage) Cleanup() { + // no-op +} diff --git a/internal/component/loki/process/stages/eventlogmessage.go b/internal/component/loki/process/stages/eventlogmessage.go index 247b80163911..840248201825 100644 --- a/internal/component/loki/process/stages/eventlogmessage.go +++ b/internal/component/loki/process/stages/eventlogmessage.go @@ -116,6 +116,11 @@ func (m *eventLogMessageStage) Name() string { return StageTypeEventLogMessage } +// Cleanup implements Stage. +func (*eventLogMessageStage) Cleanup() { + // no-op +} + // Sanitize a input string to convert it into a valid prometheus label // TODO: switch to prometheus/prometheus/util/strutil/SanitizeFullLabelName func SanitizeFullLabelName(input string) string { diff --git a/internal/component/loki/process/stages/extensions.go b/internal/component/loki/process/stages/extensions.go index 3abbfd624346..df7648831df0 100644 --- a/internal/component/loki/process/stages/extensions.go +++ b/internal/component/loki/process/stages/extensions.go @@ -100,6 +100,11 @@ func (c *cri) Name() string { return StageTypeCRI } +// Cleanup implements Stage. +func (*cri) Cleanup() { + // no-op +} + // implements Stage interface func (c *cri) Run(entry chan Entry) chan Entry { entry = c.base.Run(entry) diff --git a/internal/component/loki/process/stages/geoip.go b/internal/component/loki/process/stages/geoip.go index 505077ecfe2d..f5f777689eb8 100644 --- a/internal/component/loki/process/stages/geoip.go +++ b/internal/component/loki/process/stages/geoip.go @@ -147,6 +147,11 @@ func (g *geoIPStage) Name() string { return StageTypeGeoIP } +// Cleanup implements Stage. +func (*geoIPStage) Cleanup() { + // no-op +} + func (g *geoIPStage) process(_ model.LabelSet, extracted map[string]interface{}) { var ip net.IP if g.cfgs.Source != nil { diff --git a/internal/component/loki/process/stages/json.go b/internal/component/loki/process/stages/json.go index 5f98209b46ae..d96a203fd746 100644 --- a/internal/component/loki/process/stages/json.go +++ b/internal/component/loki/process/stages/json.go @@ -174,3 +174,8 @@ func (j *jsonStage) processEntry(extracted map[string]interface{}, entry *string func (j *jsonStage) Name() string { return StageTypeJSON } + +// Cleanup implements Stage. +func (*jsonStage) Cleanup() { + // no-op +} diff --git a/internal/component/loki/process/stages/limit.go b/internal/component/loki/process/stages/limit.go index aee76ee519f6..7d7d9c1dec8d 100644 --- a/internal/component/loki/process/stages/limit.go +++ b/internal/component/loki/process/stages/limit.go @@ -130,6 +130,11 @@ func (m *limitStage) Name() string { return StageTypeLimit } +// Cleanup implements Stage. +func (*limitStage) Cleanup() { + // no-op +} + func getDropCountByLabelMetric(registerer prometheus.Registerer) *prometheus.CounterVec { return registerCounterVec(registerer, "loki_process", "dropped_lines_by_label_total", "A count of all log lines dropped as a result of a pipeline stage", diff --git a/internal/component/loki/process/stages/match.go b/internal/component/loki/process/stages/match.go index 4b84bb99a90a..63cb0f30d2a6 100644 --- a/internal/component/loki/process/stages/match.go +++ b/internal/component/loki/process/stages/match.go @@ -196,3 +196,8 @@ func (m *matcherStage) processLogQL(e Entry) (Entry, bool) { func (m *matcherStage) Name() string { return StageTypeMatch } + +// Cleanup implements Stage. +func (*matcherStage) Cleanup() { + // no-op +} diff --git a/internal/component/loki/process/stages/metric.go b/internal/component/loki/process/stages/metric.go index 983354abd6de..880b61b085c4 100644 --- a/internal/component/loki/process/stages/metric.go +++ b/internal/component/loki/process/stages/metric.go @@ -104,11 +104,11 @@ func newMetricStage(logger log.Logger, config MetricsConfig, registry prometheus return nil, fmt.Errorf("undefined stage type in '%v', exiting", cfg) } } - return toStage(&metricStage{ + return &metricStage{ logger: logger, cfg: config, metrics: metrics, - }), nil + }, nil } // metricStage creates and updates prometheus metrics based on extracted pipeline data @@ -118,6 +118,19 @@ type metricStage struct { metrics map[string]cfgCollector } +func (m *metricStage) Run(in chan Entry) chan Entry { + out := make(chan Entry) + go func() { + defer close(out) + + for e := range in { + m.Process(e.Labels, e.Extracted, &e.Timestamp, &e.Line) + out <- e + } + }() + return out +} + // Process implements Stage func (m *metricStage) Process(labels model.LabelSet, extracted map[string]interface{}, t *time.Time, entry *string) { for name, cc := range m.metrics { @@ -162,6 +175,20 @@ func (m *metricStage) Name() string { return StageTypeMetric } +// Cleanup implements Stage. +func (m *metricStage) Cleanup() { + for _, cfgCollector := range m.metrics { + switch vec := cfgCollector.collector.(type) { + case *metric.Counters: + vec.DeleteAll() + case *metric.Gauges: + vec.DeleteAll() + case *metric.Histograms: + vec.DeleteAll() + } + } +} + // recordCounter will update a counter metric func (m *metricStage) recordCounter(name string, counter *metric.Counters, labels model.LabelSet, v interface{}) { // If value matching is defined, make sure value matches. diff --git a/internal/component/loki/process/stages/metric_test.go b/internal/component/loki/process/stages/metric_test.go index 559c5fc913b8..b270d22440aa 100644 --- a/internal/component/loki/process/stages/metric_test.go +++ b/internal/component/loki/process/stages/metric_test.go @@ -121,6 +121,13 @@ func TestMetricsPipeline(t *testing.T) { strings.NewReader(expectedMetrics)); err != nil { t.Fatalf("mismatch metrics: %v", err) } + + pl.Cleanup() + + if err := testutil.GatherAndCompare(registry, + strings.NewReader("")); err != nil { + t.Fatalf("mismatch metrics: %v", err) + } } func TestNegativeGauge(t *testing.T) { diff --git a/internal/component/loki/process/stages/multiline.go b/internal/component/loki/process/stages/multiline.go index 677fe5c09b57..c9cb7cb21394 100644 --- a/internal/component/loki/process/stages/multiline.go +++ b/internal/component/loki/process/stages/multiline.go @@ -205,3 +205,8 @@ func (m *multilineStage) flush(out chan Entry, s *multilineState) { func (m *multilineStage) Name() string { return StageTypeMultiline } + +// Cleanup implements Stage. +func (*multilineStage) Cleanup() { + // no-op +} diff --git a/internal/component/loki/process/stages/pack.go b/internal/component/loki/process/stages/pack.go index a5d1633b4011..0a2cf6a0b3a7 100644 --- a/internal/component/loki/process/stages/pack.go +++ b/internal/component/loki/process/stages/pack.go @@ -197,3 +197,8 @@ func (m *packStage) pack(e Entry) Entry { func (m *packStage) Name() string { return StageTypePack } + +// Cleanup implements Stage. +func (*packStage) Cleanup() { + // no-op +} diff --git a/internal/component/loki/process/stages/pipeline.go b/internal/component/loki/process/stages/pipeline.go index e24642c84cba..57a9a58e1104 100644 --- a/internal/component/loki/process/stages/pipeline.go +++ b/internal/component/loki/process/stages/pipeline.go @@ -144,6 +144,13 @@ func (p *Pipeline) Name() string { return StageTypePipeline } +// Cleanup implements Stage. +func (p *Pipeline) Cleanup() { + for _, s := range p.stages { + s.Cleanup() + } +} + // Wrap implements EntryMiddleware func (p *Pipeline) Wrap(next loki.EntryHandler) loki.EntryHandler { handlerIn := make(chan loki.Entry) @@ -181,6 +188,7 @@ func (p *Pipeline) Wrap(next loki.EntryHandler) loki.EntryHandler { return loki.NewEntryHandler(handlerIn, func() { once.Do(func() { close(handlerIn) }) wg.Wait() + p.Cleanup() }) } diff --git a/internal/component/loki/process/stages/sampling.go b/internal/component/loki/process/stages/sampling.go index 53bb6ad2e579..dec5976a6db3 100644 --- a/internal/component/loki/process/stages/sampling.go +++ b/internal/component/loki/process/stages/sampling.go @@ -100,3 +100,8 @@ func (m *samplingStage) randomNumber() uint64 { func (m *samplingStage) Name() string { return StageTypeSampling } + +// Cleanup implements Stage. +func (*samplingStage) Cleanup() { + // no-op +} diff --git a/internal/component/loki/process/stages/stage.go b/internal/component/loki/process/stages/stage.go index 0958bfdf09b2..2cb69c26e48f 100644 --- a/internal/component/loki/process/stages/stage.go +++ b/internal/component/loki/process/stages/stage.go @@ -61,6 +61,7 @@ type Entry struct { type Stage interface { Name() string Run(chan Entry) chan Entry + Cleanup() } func (entry *Entry) copy() *Entry { @@ -243,3 +244,8 @@ func New(logger log.Logger, jobName *string, cfg StageConfig, registerer prometh } return s, nil } + +// Cleanup implements Stage. +func (*stageProcessor) Cleanup() { + // no-op +} diff --git a/internal/component/loki/process/stages/structured_metadata.go b/internal/component/loki/process/stages/structured_metadata.go index abaee55245b8..256bbb6df3c3 100644 --- a/internal/component/loki/process/stages/structured_metadata.go +++ b/internal/component/loki/process/stages/structured_metadata.go @@ -27,6 +27,11 @@ func (s *structuredMetadataStage) Name() string { return StageTypeStructuredMetadata } +// Cleanup implements Stage. +func (*structuredMetadataStage) Cleanup() { + // no-op +} + func (s *structuredMetadataStage) Run(in chan Entry) chan Entry { return RunWith(in, func(e Entry) Entry { processLabelsConfigs(s.logger, e.Extracted, s.cfgs, func(labelName model.LabelName, labelValue model.LabelValue) {