Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: [k190] fix: promtail; clean up metrics generated from logs after a config reload. #12938

Merged
merged 1 commit into from
May 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions clients/pkg/logentry/metric/metricvec.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,12 @@ func (c *metricVec) Delete(labels model.LabelSet) bool {
return ok
}

func (c *metricVec) DeleteAll() {
c.mtx.Lock()
defer c.mtx.Unlock()
c.metrics = map[model.Fingerprint]prometheus.Metric{}
}

// prune will remove all metrics which implement the Expirable interface and have expired
// it does not take out a lock on the metrics map so whoever calls this function should do so.
func (c *metricVec) prune() {
Expand Down
5 changes: 5 additions & 0 deletions clients/pkg/logentry/stages/decolorize.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,3 +33,8 @@ func (m *decolorizeStage) Run(in chan Entry) chan Entry {
func (m *decolorizeStage) Name() string {
return StageTypeDecolorize
}

// Cleanup implements Stage.
func (*decolorizeStage) Cleanup() {
// no-op
}
5 changes: 5 additions & 0 deletions clients/pkg/logentry/stages/drop.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,3 +266,8 @@ func (m *dropStage) shouldDrop(e Entry) bool {
func (m *dropStage) Name() string {
return StageTypeDrop
}

// Cleanup implements Stage.
func (*dropStage) Cleanup() {
// no-op
}
5 changes: 5 additions & 0 deletions clients/pkg/logentry/stages/eventlogmessage.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,11 @@ func (m *eventLogMessageStage) Name() string {
return StageTypeEventLogMessage
}

// Cleanup implements Stage.
func (*eventLogMessageStage) Cleanup() {
// no-op
}

// Sanitize a input string to convert it into a valid prometheus label
// TODO: switch to prometheus/prometheus/util/strutil/SanitizeFullLabelName
func SanitizeFullLabelName(input string) string {
Expand Down
5 changes: 5 additions & 0 deletions clients/pkg/logentry/stages/extensions.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,11 @@ func (c *cri) Name() string {
return "cri"
}

// Cleanup implements Stage.
func (*cri) Cleanup() {
// no-op
}

// implements Stage interface
func (c *cri) Run(entry chan Entry) chan Entry {
entry = c.base.Run(entry)
Expand Down
5 changes: 5 additions & 0 deletions clients/pkg/logentry/stages/geoip.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,11 @@ func (g *geoIPStage) Name() string {
return StageTypeGeoIP
}

// Cleanup implements Stage.
func (*geoIPStage) Cleanup() {
// no-op
}

func (g *geoIPStage) process(labels model.LabelSet, extracted map[string]interface{}, _ *time.Time, _ *string) {
var ip net.IP
if g.cfgs.Source != nil {
Expand Down
5 changes: 5 additions & 0 deletions clients/pkg/logentry/stages/json.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,3 +188,8 @@ func (j *jsonStage) processEntry(extracted map[string]interface{}, entry *string
func (j *jsonStage) Name() string {
return StageTypeJSON
}

// Cleanup implements Stage.
func (*jsonStage) Cleanup() {
// no-op
}
5 changes: 5 additions & 0 deletions clients/pkg/logentry/stages/limit.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,11 @@ func (m *limitStage) Name() string {
return StageTypeLimit
}

// Cleanup implements Stage.
func (*limitStage) Cleanup() {
// no-op
}

func getDropCountByLabelMetric(registerer prometheus.Registerer) *prometheus.CounterVec {
return util.RegisterCounterVec(registerer, "logentry", "dropped_lines_by_label_total",
"A count of all log lines dropped as a result of a pipeline stage",
Expand Down
5 changes: 5 additions & 0 deletions clients/pkg/logentry/stages/match.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,3 +206,8 @@ func (m *matcherStage) processLogQL(e Entry) (Entry, bool) {
func (m *matcherStage) Name() string {
return StageTypeMatch
}

// Cleanup implements Stage.
func (*matcherStage) Cleanup() {
// no-op
}
31 changes: 29 additions & 2 deletions clients/pkg/logentry/stages/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,11 +128,11 @@ func newMetricStage(logger log.Logger, config interface{}, registry prometheus.R
metrics[name] = collector
}
}
return toStage(&metricStage{
return &metricStage{
logger: logger,
cfg: *cfgs,
metrics: metrics,
}), nil
}, nil
}

// metricStage creates and updates prometheus metrics based on extracted pipeline data
Expand All @@ -142,6 +142,19 @@ type metricStage struct {
metrics map[string]prometheus.Collector
}

func (m *metricStage) Run(in chan Entry) chan Entry {
out := make(chan Entry)
go func() {
defer close(out)

for e := range in {
m.Process(e.Labels, e.Extracted, &e.Timestamp, &e.Line)
out <- e
}
}()
return out
}

// Process implements Stage
func (m *metricStage) Process(labels model.LabelSet, extracted map[string]interface{}, _ *time.Time, entry *string) {
for name, collector := range m.metrics {
Expand Down Expand Up @@ -178,6 +191,20 @@ func (m *metricStage) Name() string {
return StageTypeMetric
}

// Cleanup implements Stage.
func (m *metricStage) Cleanup() {
for _, collector := range m.metrics {
switch vec := collector.(type) {
case *metric.Counters:
vec.DeleteAll()
case *metric.Gauges:
vec.DeleteAll()
case *metric.Histograms:
vec.DeleteAll()
}
}
}

// recordCounter will update a counter metric
// nolint:goconst
func (m *metricStage) recordCounter(name string, counter *metric.Counters, labels model.LabelSet, v interface{}) {
Expand Down
9 changes: 8 additions & 1 deletion clients/pkg/logentry/stages/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,13 @@ func TestMetricsPipeline(t *testing.T) {
strings.NewReader(expectedMetrics)); err != nil {
t.Fatalf("mismatch metrics: %v", err)
}

pl.Cleanup()

if err := testutil.GatherAndCompare(registry,
strings.NewReader("")); err != nil {
t.Fatalf("mismatch metrics: %v", err)
}
}

func TestNegativeGauge(t *testing.T) {
Expand Down Expand Up @@ -435,7 +442,7 @@ func TestDefaultIdleDuration(t *testing.T) {
if err != nil {
t.Fatalf("failed to create stage with metrics: %v", err)
}
assert.Equal(t, int64(5*time.Minute.Seconds()), ms.(*stageProcessor).Processor.(*metricStage).cfg["total_keys"].maxIdleSec)
assert.Equal(t, int64(5*time.Minute.Seconds()), ms.(*metricStage).cfg["total_keys"].maxIdleSec)
}

var (
Expand Down
5 changes: 5 additions & 0 deletions clients/pkg/logentry/stages/multiline.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,3 +229,8 @@ func (m *multilineStage) flush(out chan Entry, s *multilineState) {
func (m *multilineStage) Name() string {
return StageTypeMultiline
}

// Cleanup implements Stage.
func (*multilineStage) Cleanup() {
// no-op
}
5 changes: 5 additions & 0 deletions clients/pkg/logentry/stages/pack.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,3 +218,8 @@ func (m *packStage) pack(e Entry) Entry {
func (m *packStage) Name() string {
return StageTypePack
}

// Cleanup implements Stage.
func (*packStage) Cleanup() {
// no-op
}
8 changes: 8 additions & 0 deletions clients/pkg/logentry/stages/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,13 @@ type Pipeline struct {
dropCount *prometheus.CounterVec
}

// Cleanup implements Stage.
func (p *Pipeline) Cleanup() {
for _, s := range p.stages {
s.Cleanup()
}
}

// NewPipeline creates a new log entry pipeline from a configuration
func NewPipeline(logger log.Logger, stgs PipelineStages, jobName *string, registerer prometheus.Registerer) (*Pipeline, error) {
st := []Stage{}
Expand Down Expand Up @@ -169,6 +176,7 @@ func (p *Pipeline) Wrap(next api.EntryHandler) api.EntryHandler {
return api.NewEntryHandler(handlerIn, func() {
once.Do(func() { close(handlerIn) })
wg.Wait()
p.Cleanup()
})
}

Expand Down
5 changes: 5 additions & 0 deletions clients/pkg/logentry/stages/sampling.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,3 +111,8 @@ func (m *samplingStage) randomNumber() uint64 {
func (m *samplingStage) Name() string {
return StageTypeSampling
}

// Cleanup implements Stage.
func (*samplingStage) Cleanup() {
// no-op
}
6 changes: 6 additions & 0 deletions clients/pkg/logentry/stages/stage.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ type Entry struct {
type Stage interface {
Name() string
Run(chan Entry) chan Entry
Cleanup()
}

func (entry *Entry) copy() *Entry {
Expand Down Expand Up @@ -228,3 +229,8 @@ func New(logger log.Logger, jobName *string, stageType string,
}
return creator(params)
}

// Cleanup implements Stage.
func (*stageProcessor) Cleanup() {
// no-op
}
5 changes: 5 additions & 0 deletions clients/pkg/logentry/stages/structuredmetadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,11 @@ func (s *structuredMetadataStage) Name() string {
return StageTypeStructuredMetadata
}

// Cleanup implements Stage.
func (*structuredMetadataStage) Cleanup() {
// no-op
}

func (s *structuredMetadataStage) Run(in chan Entry) chan Entry {
return RunWith(in, func(e Entry) Entry {
processLabelsConfigs(s.logger, e.Extracted, s.cfgs, func(labelName model.LabelName, labelValue model.LabelValue) {
Expand Down
3 changes: 2 additions & 1 deletion docs/sources/send-data/promtail/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -681,7 +681,8 @@ The metrics stage allows for defining metrics from the extracted data.

Created metrics are not pushed to Loki and are instead exposed via Promtail's
`/metrics` endpoint. Prometheus should be configured to scrape Promtail to be
able to retrieve the metrics configured by this stage.
able to retrieve the metrics configured by this stage.
If Promtail's configuration is reloaded, all metrics will be reset.


```yaml
Expand Down
3 changes: 2 additions & 1 deletion docs/sources/send-data/promtail/stages/metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ The `metrics` stage is an action stage that allows for defining and updating
metrics based on data from the extracted map. Note that created metrics are not
pushed to Loki and are instead exposed via Promtail's `/metrics` endpoint.
Prometheus should be configured to scrape Promtail to be able to retrieve the
metrics configured by this stage.
metrics configured by this stage. If Promtail's configuration is reloaded,
all metrics will be reset.

## Schema

Expand Down
Loading