diff --git a/go.mod b/go.mod index 8fe5d40b3d..1cc154445f 100644 --- a/go.mod +++ b/go.mod @@ -64,7 +64,7 @@ require ( github.com/grafana/jfr-parser/pprof v0.0.0-20240126072739-986e71dc0361 github.com/grafana/jsonparser v0.0.0-20240209175146-098958973a2d github.com/grafana/kafka_exporter v0.0.0-20240409084445-5e3488ad9f9a - github.com/grafana/loki v1.6.2-0.20240221085104-f9d188620153 // k190 branch + github.com/grafana/loki v1.6.2-0.20240510183741-cef4c2826b4b // k190 branch github.com/grafana/loki/pkg/push v0.0.0-20231212100434-384e5c2dc872 // k180 branch github.com/grafana/pyroscope-go/godeltaprof v0.1.7 github.com/grafana/pyroscope/api v0.4.0 diff --git a/go.sum b/go.sum index 81107247e8..c0b7f8c4cb 100644 --- a/go.sum +++ b/go.sum @@ -1052,8 +1052,8 @@ github.com/grafana/jsonparser v0.0.0-20240209175146-098958973a2d h1:YwbJJ/PrVWVd github.com/grafana/jsonparser v0.0.0-20240209175146-098958973a2d/go.mod h1:796sq+UcONnSlzA3RtlBZ+b/hrerkZXiEmO8oMjyRwY= github.com/grafana/kafka_exporter v0.0.0-20240409084445-5e3488ad9f9a h1:jqM4NNdx8LSquKo8bPx+XWn91S2b+sgNvEcFfSJQtHY= github.com/grafana/kafka_exporter v0.0.0-20240409084445-5e3488ad9f9a/go.mod h1:ZXGGyeTUMenf/H1CDBK9lv3azjswfa0nVzLoQAYmnDc= -github.com/grafana/loki v1.6.2-0.20240221085104-f9d188620153 h1:C191g5Ls8lIf9lkJEoScTQgoVDwUdK4HXKP5XtL+zAM= -github.com/grafana/loki v1.6.2-0.20240221085104-f9d188620153/go.mod h1:j2XCl3SmslPf+3Vs7uyoaJE/QkmUlL9JzTBTShSOSiU= +github.com/grafana/loki v1.6.2-0.20240510183741-cef4c2826b4b h1:x5JsSnExxRl9kTMNqHebMCv0fn+V1+T16z7Tgz6xYf4= +github.com/grafana/loki v1.6.2-0.20240510183741-cef4c2826b4b/go.mod h1:j2XCl3SmslPf+3Vs7uyoaJE/QkmUlL9JzTBTShSOSiU= github.com/grafana/loki/pkg/push v0.0.0-20231212100434-384e5c2dc872 h1:6kPX7bngjBgUlHqADwZ6249UtzMaoQW5n0H8bOtnYeM= github.com/grafana/loki/pkg/push v0.0.0-20231212100434-384e5c2dc872/go.mod h1:f3JSoxBTPXX5ec4FxxeC19nTBSxoTz+cBgS3cYLMcr0= github.com/grafana/mysqld_exporter v0.12.2-0.20231005125903-364b9c41e595 h1:I9sRknI5ajd8whPOX0nBDXy5B6xUfhItClMy+6R4oqE= diff --git a/internal/component/loki/process/metric/metricvec.go b/internal/component/loki/process/metric/metricvec.go index d1a8d69399..d09cc53e35 100644 --- a/internal/component/loki/process/metric/metricvec.go +++ b/internal/component/loki/process/metric/metricvec.go @@ -86,6 +86,12 @@ func (c *metricVec) Delete(labels model.LabelSet) bool { return ok } +func (c *metricVec) DeleteAll() { + c.mtx.Lock() + defer c.mtx.Unlock() + c.metrics = map[model.Fingerprint]prometheus.Metric{} +} + // prune will remove all metrics which implement the Expirable interface and have expired // it does not take out a lock on the metrics map so whoever calls this function should do so. func (c *metricVec) prune() { diff --git a/internal/component/loki/process/stages/decolorize.go b/internal/component/loki/process/stages/decolorize.go index 26356ef155..33f365934b 100644 --- a/internal/component/loki/process/stages/decolorize.go +++ b/internal/component/loki/process/stages/decolorize.go @@ -35,3 +35,8 @@ func (m *decolorizeStage) Run(in chan Entry) chan Entry { func (m *decolorizeStage) Name() string { return StageTypeDecolorize } + +// Cleanup implements Stage. +func (*decolorizeStage) Cleanup() { + // no-op +} diff --git a/internal/component/loki/process/stages/drop.go b/internal/component/loki/process/stages/drop.go index fae4c4559e..5be08b468d 100644 --- a/internal/component/loki/process/stages/drop.go +++ b/internal/component/loki/process/stages/drop.go @@ -222,3 +222,8 @@ func splitSource(s string) []string { func (m *dropStage) Name() string { return StageTypeDrop } + +// Cleanup implements Stage. +func (*dropStage) Cleanup() { + // no-op +} diff --git a/internal/component/loki/process/stages/eventlogmessage.go b/internal/component/loki/process/stages/eventlogmessage.go index 08a35679e2..986eabf20a 100644 --- a/internal/component/loki/process/stages/eventlogmessage.go +++ b/internal/component/loki/process/stages/eventlogmessage.go @@ -116,6 +116,11 @@ func (m *eventLogMessageStage) Name() string { return StageTypeEventLogMessage } +// Cleanup implements Stage. +func (*eventLogMessageStage) Cleanup() { + // no-op +} + // Sanitize a input string to convert it into a valid prometheus label // TODO: switch to prometheus/prometheus/util/strutil/SanitizeFullLabelName func SanitizeFullLabelName(input string) string { diff --git a/internal/component/loki/process/stages/extensions.go b/internal/component/loki/process/stages/extensions.go index 3038048cf7..51262210ec 100644 --- a/internal/component/loki/process/stages/extensions.go +++ b/internal/component/loki/process/stages/extensions.go @@ -100,6 +100,11 @@ func (c *cri) Name() string { return StageTypeCRI } +// Cleanup implements Stage. +func (*cri) Cleanup() { + // no-op +} + // implements Stage interface func (c *cri) Run(entry chan Entry) chan Entry { entry = c.base.Run(entry) diff --git a/internal/component/loki/process/stages/geoip.go b/internal/component/loki/process/stages/geoip.go index 664dc5924b..7c8f43319b 100644 --- a/internal/component/loki/process/stages/geoip.go +++ b/internal/component/loki/process/stages/geoip.go @@ -147,6 +147,11 @@ func (g *geoIPStage) Name() string { return StageTypeGeoIP } +// Cleanup implements Stage. +func (*geoIPStage) Cleanup() { + // no-op +} + func (g *geoIPStage) process(_ model.LabelSet, extracted map[string]interface{}) { var ip net.IP if g.cfgs.Source != nil { diff --git a/internal/component/loki/process/stages/json.go b/internal/component/loki/process/stages/json.go index 026f9ebd7b..6624b0098c 100644 --- a/internal/component/loki/process/stages/json.go +++ b/internal/component/loki/process/stages/json.go @@ -174,3 +174,8 @@ func (j *jsonStage) processEntry(extracted map[string]interface{}, entry *string func (j *jsonStage) Name() string { return StageTypeJSON } + +// Cleanup implements Stage. +func (*jsonStage) Cleanup() { + // no-op +} diff --git a/internal/component/loki/process/stages/limit.go b/internal/component/loki/process/stages/limit.go index 9a7ebe5fb7..dccaf740d6 100644 --- a/internal/component/loki/process/stages/limit.go +++ b/internal/component/loki/process/stages/limit.go @@ -130,6 +130,11 @@ func (m *limitStage) Name() string { return StageTypeLimit } +// Cleanup implements Stage. +func (*limitStage) Cleanup() { + // no-op +} + func getDropCountByLabelMetric(registerer prometheus.Registerer) *prometheus.CounterVec { return registerCounterVec(registerer, "loki_process", "dropped_lines_by_label_total", "A count of all log lines dropped as a result of a pipeline stage", diff --git a/internal/component/loki/process/stages/match.go b/internal/component/loki/process/stages/match.go index 3a06634670..2ed999638c 100644 --- a/internal/component/loki/process/stages/match.go +++ b/internal/component/loki/process/stages/match.go @@ -196,3 +196,8 @@ func (m *matcherStage) processLogQL(e Entry) (Entry, bool) { func (m *matcherStage) Name() string { return StageTypeMatch } + +// Cleanup implements Stage. +func (*matcherStage) Cleanup() { + // no-op +} diff --git a/internal/component/loki/process/stages/metric.go b/internal/component/loki/process/stages/metric.go index e364bf9ca6..2d05c4f006 100644 --- a/internal/component/loki/process/stages/metric.go +++ b/internal/component/loki/process/stages/metric.go @@ -104,11 +104,11 @@ func newMetricStage(logger log.Logger, config MetricsConfig, registry prometheus return nil, fmt.Errorf("undefined stage type in '%v', exiting", cfg) } } - return toStage(&metricStage{ + return &metricStage{ logger: logger, cfg: config, metrics: metrics, - }), nil + }, nil } // metricStage creates and updates prometheus metrics based on extracted pipeline data @@ -118,6 +118,19 @@ type metricStage struct { metrics map[string]cfgCollector } +func (m *metricStage) Run(in chan Entry) chan Entry { + out := make(chan Entry) + go func() { + defer close(out) + + for e := range in { + m.Process(e.Labels, e.Extracted, &e.Timestamp, &e.Line) + out <- e + } + }() + return out +} + // Process implements Stage func (m *metricStage) Process(labels model.LabelSet, extracted map[string]interface{}, t *time.Time, entry *string) { for name, cc := range m.metrics { @@ -162,6 +175,20 @@ func (m *metricStage) Name() string { return StageTypeMetric } +// Cleanup implements Stage. +func (m *metricStage) Cleanup() { + for _, cfgCollector := range m.metrics { + switch vec := cfgCollector.collector.(type) { + case *metric.Counters: + vec.DeleteAll() + case *metric.Gauges: + vec.DeleteAll() + case *metric.Histograms: + vec.DeleteAll() + } + } +} + // recordCounter will update a counter metric func (m *metricStage) recordCounter(name string, counter *metric.Counters, labels model.LabelSet, v interface{}) { // If value matching is defined, make sure value matches. diff --git a/internal/component/loki/process/stages/metric_test.go b/internal/component/loki/process/stages/metric_test.go index 9bc1747faf..fb5452a648 100644 --- a/internal/component/loki/process/stages/metric_test.go +++ b/internal/component/loki/process/stages/metric_test.go @@ -121,6 +121,13 @@ func TestMetricsPipeline(t *testing.T) { strings.NewReader(expectedMetrics)); err != nil { t.Fatalf("mismatch metrics: %v", err) } + + pl.Cleanup() + + if err := testutil.GatherAndCompare(registry, + strings.NewReader("")); err != nil { + t.Fatalf("mismatch metrics: %v", err) + } } func TestNegativeGauge(t *testing.T) { diff --git a/internal/component/loki/process/stages/multiline.go b/internal/component/loki/process/stages/multiline.go index d0853a00e5..1015d9e59e 100644 --- a/internal/component/loki/process/stages/multiline.go +++ b/internal/component/loki/process/stages/multiline.go @@ -205,3 +205,8 @@ func (m *multilineStage) flush(out chan Entry, s *multilineState) { func (m *multilineStage) Name() string { return StageTypeMultiline } + +// Cleanup implements Stage. +func (*multilineStage) Cleanup() { + // no-op +} diff --git a/internal/component/loki/process/stages/pack.go b/internal/component/loki/process/stages/pack.go index a8cd50ca09..7b03afe945 100644 --- a/internal/component/loki/process/stages/pack.go +++ b/internal/component/loki/process/stages/pack.go @@ -197,3 +197,8 @@ func (m *packStage) pack(e Entry) Entry { func (m *packStage) Name() string { return StageTypePack } + +// Cleanup implements Stage. +func (*packStage) Cleanup() { + // no-op +} diff --git a/internal/component/loki/process/stages/pipeline.go b/internal/component/loki/process/stages/pipeline.go index 12311417a1..043e16cce6 100644 --- a/internal/component/loki/process/stages/pipeline.go +++ b/internal/component/loki/process/stages/pipeline.go @@ -144,6 +144,13 @@ func (p *Pipeline) Name() string { return StageTypePipeline } +// Cleanup implements Stage. +func (p *Pipeline) Cleanup() { + for _, s := range p.stages { + s.Cleanup() + } +} + // Wrap implements EntryMiddleware func (p *Pipeline) Wrap(next loki.EntryHandler) loki.EntryHandler { handlerIn := make(chan loki.Entry) @@ -181,6 +188,7 @@ func (p *Pipeline) Wrap(next loki.EntryHandler) loki.EntryHandler { return loki.NewEntryHandler(handlerIn, func() { once.Do(func() { close(handlerIn) }) wg.Wait() + p.Cleanup() }) } diff --git a/internal/component/loki/process/stages/sampling.go b/internal/component/loki/process/stages/sampling.go index d2ef316535..a7b12d3e2e 100644 --- a/internal/component/loki/process/stages/sampling.go +++ b/internal/component/loki/process/stages/sampling.go @@ -100,3 +100,8 @@ func (m *samplingStage) randomNumber() uint64 { func (m *samplingStage) Name() string { return StageTypeSampling } + +// Cleanup implements Stage. +func (*samplingStage) Cleanup() { + // no-op +} diff --git a/internal/component/loki/process/stages/stage.go b/internal/component/loki/process/stages/stage.go index 203d924503..3787daabac 100644 --- a/internal/component/loki/process/stages/stage.go +++ b/internal/component/loki/process/stages/stage.go @@ -61,6 +61,7 @@ type Entry struct { type Stage interface { Name() string Run(chan Entry) chan Entry + Cleanup() } func (entry *Entry) copy() *Entry { @@ -243,3 +244,8 @@ func New(logger log.Logger, jobName *string, cfg StageConfig, registerer prometh } return s, nil } + +// Cleanup implements Stage. +func (*stageProcessor) Cleanup() { + // no-op +} diff --git a/internal/component/loki/process/stages/structured_metadata.go b/internal/component/loki/process/stages/structured_metadata.go index abaee55245..256bbb6df3 100644 --- a/internal/component/loki/process/stages/structured_metadata.go +++ b/internal/component/loki/process/stages/structured_metadata.go @@ -27,6 +27,11 @@ func (s *structuredMetadataStage) Name() string { return StageTypeStructuredMetadata } +// Cleanup implements Stage. +func (*structuredMetadataStage) Cleanup() { + // no-op +} + func (s *structuredMetadataStage) Run(in chan Entry) chan Entry { return RunWith(in, func(e Entry) Entry { processLabelsConfigs(s.logger, e.Extracted, s.cfgs, func(labelName model.LabelName, labelValue model.LabelValue) {