Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: Patch k195 with bugfixes for a Grafana Agent #15077

Merged
merged 2 commits into from
Nov 27, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions clients/pkg/logentry/metric/metricvec.go
Original file line number Diff line number Diff line change
@@ -84,6 +84,12 @@ func (c *metricVec) Delete(labels model.LabelSet) bool {
return ok
}

func (c *metricVec) DeleteAll() {
c.mtx.Lock()
defer c.mtx.Unlock()
c.metrics = map[model.Fingerprint]prometheus.Metric{}
}

// prune will remove all metrics which implement the Expirable interface and have expired
// it does not take out a lock on the metrics map so whoever calls this function should do so.
func (c *metricVec) prune() {
5 changes: 5 additions & 0 deletions clients/pkg/logentry/stages/decolorize.go
Original file line number Diff line number Diff line change
@@ -33,3 +33,8 @@ func (m *decolorizeStage) Run(in chan Entry) chan Entry {
func (m *decolorizeStage) Name() string {
return StageTypeDecolorize
}

// Cleanup implements Stage.
func (*decolorizeStage) Cleanup() {
// no-op
}
5 changes: 5 additions & 0 deletions clients/pkg/logentry/stages/drop.go
Original file line number Diff line number Diff line change
@@ -266,3 +266,8 @@ func (m *dropStage) shouldDrop(e Entry) bool {
func (m *dropStage) Name() string {
return StageTypeDrop
}

// Cleanup implements Stage.
func (*dropStage) Cleanup() {
// no-op
}
5 changes: 5 additions & 0 deletions clients/pkg/logentry/stages/eventlogmessage.go
Original file line number Diff line number Diff line change
@@ -142,6 +142,11 @@ func (m *eventLogMessageStage) Name() string {
return StageTypeEventLogMessage
}

// Cleanup implements Stage.
func (*eventLogMessageStage) Cleanup() {
// no-op
}

// Sanitize a input string to convert it into a valid prometheus label
// TODO: switch to prometheus/prometheus/util/strutil/SanitizeFullLabelName
func SanitizeFullLabelName(input string) string {
5 changes: 5 additions & 0 deletions clients/pkg/logentry/stages/extensions.go
Original file line number Diff line number Diff line change
@@ -59,6 +59,11 @@ func (c *cri) Name() string {
return "cri"
}

// Cleanup implements Stage.
func (*cri) Cleanup() {
// no-op
}

// implements Stage interface
func (c *cri) Run(entry chan Entry) chan Entry {
entry = c.base.Run(entry)
5 changes: 5 additions & 0 deletions clients/pkg/logentry/stages/geoip.go
Original file line number Diff line number Diff line change
@@ -123,6 +123,11 @@ func (g *geoIPStage) Name() string {
return StageTypeGeoIP
}

// Cleanup implements Stage.
func (*geoIPStage) Cleanup() {
// no-op
}

func (g *geoIPStage) process(labels model.LabelSet, extracted map[string]interface{}, _ *time.Time, _ *string) {
var ip net.IP
if g.cfgs.Source != nil {
5 changes: 5 additions & 0 deletions clients/pkg/logentry/stages/json.go
Original file line number Diff line number Diff line change
@@ -188,3 +188,8 @@ func (j *jsonStage) processEntry(extracted map[string]interface{}, entry *string
func (j *jsonStage) Name() string {
return StageTypeJSON
}

// Cleanup implements Stage.
func (*jsonStage) Cleanup() {
// no-op
}
5 changes: 5 additions & 0 deletions clients/pkg/logentry/stages/limit.go
Original file line number Diff line number Diff line change
@@ -138,6 +138,11 @@ func (m *limitStage) Name() string {
return StageTypeLimit
}

// Cleanup implements Stage.
func (*limitStage) Cleanup() {
// no-op
}

func getDropCountByLabelMetric(registerer prometheus.Registerer) *prometheus.CounterVec {
return util.RegisterCounterVec(registerer, "logentry", "dropped_lines_by_label_total",
"A count of all log lines dropped as a result of a pipeline stage",
5 changes: 5 additions & 0 deletions clients/pkg/logentry/stages/match.go
Original file line number Diff line number Diff line change
@@ -206,3 +206,8 @@ func (m *matcherStage) processLogQL(e Entry) (Entry, bool) {
func (m *matcherStage) Name() string {
return StageTypeMatch
}

// Cleanup implements Stage.
func (*matcherStage) Cleanup() {
// no-op
}
31 changes: 29 additions & 2 deletions clients/pkg/logentry/stages/metrics.go
Original file line number Diff line number Diff line change
@@ -128,11 +128,11 @@ func newMetricStage(logger log.Logger, config interface{}, registry prometheus.R
metrics[name] = collector
}
}
return toStage(&metricStage{
return &metricStage{
logger: logger,
cfg: *cfgs,
metrics: metrics,
}), nil
}, nil
}

// metricStage creates and updates prometheus metrics based on extracted pipeline data
@@ -142,6 +142,19 @@ type metricStage struct {
metrics map[string]prometheus.Collector
}

func (m *metricStage) Run(in chan Entry) chan Entry {
out := make(chan Entry)
go func() {
defer close(out)

for e := range in {
m.Process(e.Labels, e.Extracted, &e.Timestamp, &e.Line)
out <- e
}
}()
return out
}

// Process implements Stage
func (m *metricStage) Process(labels model.LabelSet, extracted map[string]interface{}, _ *time.Time, entry *string) {
for name, collector := range m.metrics {
@@ -178,6 +191,20 @@ func (m *metricStage) Name() string {
return StageTypeMetric
}

// Cleanup implements Stage.
func (m *metricStage) Cleanup() {
for _, collector := range m.metrics {
switch vec := collector.(type) {
case *metric.Counters:
vec.DeleteAll()
case *metric.Gauges:
vec.DeleteAll()
case *metric.Histograms:
vec.DeleteAll()
}
}
}

// recordCounter will update a counter metric
// nolint:goconst
func (m *metricStage) recordCounter(name string, counter *metric.Counters, labels model.LabelSet, v interface{}) {
9 changes: 8 additions & 1 deletion clients/pkg/logentry/stages/metrics_test.go
Original file line number Diff line number Diff line change
@@ -127,6 +127,13 @@ func TestMetricsPipeline(t *testing.T) {
strings.NewReader(expectedMetrics)); err != nil {
t.Fatalf("mismatch metrics: %v", err)
}

pl.Cleanup()

if err := testutil.GatherAndCompare(registry,
strings.NewReader("")); err != nil {
t.Fatalf("mismatch metrics: %v", err)
}
}

func TestNegativeGauge(t *testing.T) {
@@ -435,7 +442,7 @@ func TestDefaultIdleDuration(t *testing.T) {
if err != nil {
t.Fatalf("failed to create stage with metrics: %v", err)
}
assert.Equal(t, int64(5*time.Minute.Seconds()), ms.(*stageProcessor).Processor.(*metricStage).cfg["total_keys"].maxIdleSec)
assert.Equal(t, int64(5*time.Minute.Seconds()), ms.(*metricStage).cfg["total_keys"].maxIdleSec)
}

var (
5 changes: 5 additions & 0 deletions clients/pkg/logentry/stages/multiline.go
Original file line number Diff line number Diff line change
@@ -229,3 +229,8 @@ func (m *multilineStage) flush(out chan Entry, s *multilineState) {
func (m *multilineStage) Name() string {
return StageTypeMultiline
}

// Cleanup implements Stage.
func (*multilineStage) Cleanup() {
// no-op
}
5 changes: 5 additions & 0 deletions clients/pkg/logentry/stages/pack.go
Original file line number Diff line number Diff line change
@@ -218,3 +218,8 @@ func (m *packStage) pack(e Entry) Entry {
func (m *packStage) Name() string {
return StageTypePack
}

// Cleanup implements Stage.
func (*packStage) Cleanup() {
// no-op
}
8 changes: 8 additions & 0 deletions clients/pkg/logentry/stages/pipeline.go
Original file line number Diff line number Diff line change
@@ -30,6 +30,13 @@ type Pipeline struct {
dropCount *prometheus.CounterVec
}

// Cleanup implements Stage.
func (p *Pipeline) Cleanup() {
for _, s := range p.stages {
s.Cleanup()
}
}

// NewPipeline creates a new log entry pipeline from a configuration
func NewPipeline(logger log.Logger, stgs PipelineStages, jobName *string, registerer prometheus.Registerer) (*Pipeline, error) {
st := []Stage{}
@@ -169,6 +176,7 @@ func (p *Pipeline) Wrap(next api.EntryHandler) api.EntryHandler {
return api.NewEntryHandler(handlerIn, func() {
once.Do(func() { close(handlerIn) })
wg.Wait()
p.Cleanup()
})
}

5 changes: 5 additions & 0 deletions clients/pkg/logentry/stages/sampling.go
Original file line number Diff line number Diff line change
@@ -111,3 +111,8 @@ func (m *samplingStage) randomNumber() uint64 {
func (m *samplingStage) Name() string {
return StageTypeSampling
}

// Cleanup implements Stage.
func (*samplingStage) Cleanup() {
// no-op
}
6 changes: 6 additions & 0 deletions clients/pkg/logentry/stages/stage.go
Original file line number Diff line number Diff line change
@@ -62,6 +62,7 @@ type Entry struct {
type Stage interface {
Name() string
Run(chan Entry) chan Entry
Cleanup()
}

func (entry *Entry) copy() *Entry {
@@ -228,3 +229,8 @@ func New(logger log.Logger, jobName *string, stageType string,
}
return creator(params)
}

// Cleanup implements Stage.
func (*stageProcessor) Cleanup() {
// no-op
}
5 changes: 5 additions & 0 deletions clients/pkg/logentry/stages/structuredmetadata.go
Original file line number Diff line number Diff line change
@@ -33,6 +33,11 @@ func (s *structuredMetadataStage) Name() string {
return StageTypeStructuredMetadata
}

// Cleanup implements Stage.
func (*structuredMetadataStage) Cleanup() {
// no-op
}

func (s *structuredMetadataStage) Run(in chan Entry) chan Entry {
return RunWith(in, func(e Entry) Entry {
processLabelsConfigs(s.logger, e.Extracted, s.cfgs, func(labelName model.LabelName, labelValue model.LabelValue) {
Loading