Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update Loki and sync some of the Promtail code #836

Merged
merged 2 commits into from
May 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ require (
github.com/grafana/jfr-parser/pprof v0.0.0-20240126072739-986e71dc0361
github.com/grafana/jsonparser v0.0.0-20240209175146-098958973a2d
github.com/grafana/kafka_exporter v0.0.0-20240409084445-5e3488ad9f9a
github.com/grafana/loki v1.6.2-0.20240221085104-f9d188620153 // k190 branch
github.com/grafana/loki v1.6.2-0.20240510183741-cef4c2826b4b // k190 branch
github.com/grafana/loki/pkg/push v0.0.0-20231212100434-384e5c2dc872 // k180 branch
github.com/grafana/pyroscope-go/godeltaprof v0.1.7
github.com/grafana/pyroscope/api v0.4.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1052,8 +1052,8 @@ github.com/grafana/jsonparser v0.0.0-20240209175146-098958973a2d h1:YwbJJ/PrVWVd
github.com/grafana/jsonparser v0.0.0-20240209175146-098958973a2d/go.mod h1:796sq+UcONnSlzA3RtlBZ+b/hrerkZXiEmO8oMjyRwY=
github.com/grafana/kafka_exporter v0.0.0-20240409084445-5e3488ad9f9a h1:jqM4NNdx8LSquKo8bPx+XWn91S2b+sgNvEcFfSJQtHY=
github.com/grafana/kafka_exporter v0.0.0-20240409084445-5e3488ad9f9a/go.mod h1:ZXGGyeTUMenf/H1CDBK9lv3azjswfa0nVzLoQAYmnDc=
github.com/grafana/loki v1.6.2-0.20240221085104-f9d188620153 h1:C191g5Ls8lIf9lkJEoScTQgoVDwUdK4HXKP5XtL+zAM=
github.com/grafana/loki v1.6.2-0.20240221085104-f9d188620153/go.mod h1:j2XCl3SmslPf+3Vs7uyoaJE/QkmUlL9JzTBTShSOSiU=
github.com/grafana/loki v1.6.2-0.20240510183741-cef4c2826b4b h1:x5JsSnExxRl9kTMNqHebMCv0fn+V1+T16z7Tgz6xYf4=
github.com/grafana/loki v1.6.2-0.20240510183741-cef4c2826b4b/go.mod h1:j2XCl3SmslPf+3Vs7uyoaJE/QkmUlL9JzTBTShSOSiU=
github.com/grafana/loki/pkg/push v0.0.0-20231212100434-384e5c2dc872 h1:6kPX7bngjBgUlHqADwZ6249UtzMaoQW5n0H8bOtnYeM=
github.com/grafana/loki/pkg/push v0.0.0-20231212100434-384e5c2dc872/go.mod h1:f3JSoxBTPXX5ec4FxxeC19nTBSxoTz+cBgS3cYLMcr0=
github.com/grafana/mysqld_exporter v0.12.2-0.20231005125903-364b9c41e595 h1:I9sRknI5ajd8whPOX0nBDXy5B6xUfhItClMy+6R4oqE=
Expand Down
6 changes: 6 additions & 0 deletions internal/component/loki/process/metric/metricvec.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,12 @@ func (c *metricVec) Delete(labels model.LabelSet) bool {
return ok
}

func (c *metricVec) DeleteAll() {
c.mtx.Lock()
defer c.mtx.Unlock()
c.metrics = map[model.Fingerprint]prometheus.Metric{}
}

// prune will remove all metrics which implement the Expirable interface and have expired
// it does not take out a lock on the metrics map so whoever calls this function should do so.
func (c *metricVec) prune() {
Expand Down
5 changes: 5 additions & 0 deletions internal/component/loki/process/stages/decolorize.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,3 +35,8 @@ func (m *decolorizeStage) Run(in chan Entry) chan Entry {
func (m *decolorizeStage) Name() string {
return StageTypeDecolorize
}

// Cleanup implements Stage.
func (*decolorizeStage) Cleanup() {
// no-op
}
5 changes: 5 additions & 0 deletions internal/component/loki/process/stages/drop.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,3 +222,8 @@ func splitSource(s string) []string {
func (m *dropStage) Name() string {
return StageTypeDrop
}

// Cleanup implements Stage.
func (*dropStage) Cleanup() {
// no-op
}
5 changes: 5 additions & 0 deletions internal/component/loki/process/stages/eventlogmessage.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,11 @@ func (m *eventLogMessageStage) Name() string {
return StageTypeEventLogMessage
}

// Cleanup implements Stage.
func (*eventLogMessageStage) Cleanup() {
// no-op
}

// Sanitize a input string to convert it into a valid prometheus label
// TODO: switch to prometheus/prometheus/util/strutil/SanitizeFullLabelName
func SanitizeFullLabelName(input string) string {
Expand Down
5 changes: 5 additions & 0 deletions internal/component/loki/process/stages/extensions.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,11 @@ func (c *cri) Name() string {
return StageTypeCRI
}

// Cleanup implements Stage.
func (*cri) Cleanup() {
// no-op
}

// implements Stage interface
func (c *cri) Run(entry chan Entry) chan Entry {
entry = c.base.Run(entry)
Expand Down
5 changes: 5 additions & 0 deletions internal/component/loki/process/stages/geoip.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,11 @@ func (g *geoIPStage) Name() string {
return StageTypeGeoIP
}

// Cleanup implements Stage.
func (*geoIPStage) Cleanup() {
// no-op
}

func (g *geoIPStage) process(_ model.LabelSet, extracted map[string]interface{}) {
var ip net.IP
if g.cfgs.Source != nil {
Expand Down
5 changes: 5 additions & 0 deletions internal/component/loki/process/stages/json.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,3 +174,8 @@ func (j *jsonStage) processEntry(extracted map[string]interface{}, entry *string
func (j *jsonStage) Name() string {
return StageTypeJSON
}

// Cleanup implements Stage.
func (*jsonStage) Cleanup() {
// no-op
}
5 changes: 5 additions & 0 deletions internal/component/loki/process/stages/limit.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,11 @@ func (m *limitStage) Name() string {
return StageTypeLimit
}

// Cleanup implements Stage.
func (*limitStage) Cleanup() {
// no-op
}

func getDropCountByLabelMetric(registerer prometheus.Registerer) *prometheus.CounterVec {
return registerCounterVec(registerer, "loki_process", "dropped_lines_by_label_total",
"A count of all log lines dropped as a result of a pipeline stage",
Expand Down
5 changes: 5 additions & 0 deletions internal/component/loki/process/stages/match.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,3 +196,8 @@ func (m *matcherStage) processLogQL(e Entry) (Entry, bool) {
func (m *matcherStage) Name() string {
return StageTypeMatch
}

// Cleanup implements Stage.
func (*matcherStage) Cleanup() {
// no-op
}
31 changes: 29 additions & 2 deletions internal/component/loki/process/stages/metric.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,11 +104,11 @@ func newMetricStage(logger log.Logger, config MetricsConfig, registry prometheus
return nil, fmt.Errorf("undefined stage type in '%v', exiting", cfg)
}
}
return toStage(&metricStage{
return &metricStage{
logger: logger,
cfg: config,
metrics: metrics,
}), nil
}, nil
}

// metricStage creates and updates prometheus metrics based on extracted pipeline data
Expand All @@ -118,6 +118,19 @@ type metricStage struct {
metrics map[string]cfgCollector
}

func (m *metricStage) Run(in chan Entry) chan Entry {
out := make(chan Entry)
go func() {
defer close(out)

for e := range in {
m.Process(e.Labels, e.Extracted, &e.Timestamp, &e.Line)
out <- e
}
}()
return out
}

// Process implements Stage
func (m *metricStage) Process(labels model.LabelSet, extracted map[string]interface{}, t *time.Time, entry *string) {
for name, cc := range m.metrics {
Expand Down Expand Up @@ -162,6 +175,20 @@ func (m *metricStage) Name() string {
return StageTypeMetric
}

// Cleanup implements Stage.
func (m *metricStage) Cleanup() {
for _, cfgCollector := range m.metrics {
switch vec := cfgCollector.collector.(type) {
case *metric.Counters:
vec.DeleteAll()
case *metric.Gauges:
vec.DeleteAll()
case *metric.Histograms:
vec.DeleteAll()
}
}
}

// recordCounter will update a counter metric
func (m *metricStage) recordCounter(name string, counter *metric.Counters, labels model.LabelSet, v interface{}) {
// If value matching is defined, make sure value matches.
Expand Down
7 changes: 7 additions & 0 deletions internal/component/loki/process/stages/metric_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,13 @@ func TestMetricsPipeline(t *testing.T) {
strings.NewReader(expectedMetrics)); err != nil {
t.Fatalf("mismatch metrics: %v", err)
}

pl.Cleanup()

if err := testutil.GatherAndCompare(registry,
strings.NewReader("")); err != nil {
t.Fatalf("mismatch metrics: %v", err)
}
}

func TestNegativeGauge(t *testing.T) {
Expand Down
5 changes: 5 additions & 0 deletions internal/component/loki/process/stages/multiline.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,3 +205,8 @@ func (m *multilineStage) flush(out chan Entry, s *multilineState) {
func (m *multilineStage) Name() string {
return StageTypeMultiline
}

// Cleanup implements Stage.
func (*multilineStage) Cleanup() {
// no-op
}
5 changes: 5 additions & 0 deletions internal/component/loki/process/stages/pack.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,3 +197,8 @@ func (m *packStage) pack(e Entry) Entry {
func (m *packStage) Name() string {
return StageTypePack
}

// Cleanup implements Stage.
func (*packStage) Cleanup() {
// no-op
}
8 changes: 8 additions & 0 deletions internal/component/loki/process/stages/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,13 @@ func (p *Pipeline) Name() string {
return StageTypePipeline
}

// Cleanup implements Stage.
func (p *Pipeline) Cleanup() {
for _, s := range p.stages {
s.Cleanup()
}
}

// Wrap implements EntryMiddleware
func (p *Pipeline) Wrap(next loki.EntryHandler) loki.EntryHandler {
handlerIn := make(chan loki.Entry)
Expand Down Expand Up @@ -181,6 +188,7 @@ func (p *Pipeline) Wrap(next loki.EntryHandler) loki.EntryHandler {
return loki.NewEntryHandler(handlerIn, func() {
once.Do(func() { close(handlerIn) })
wg.Wait()
p.Cleanup()
})
}

Expand Down
5 changes: 5 additions & 0 deletions internal/component/loki/process/stages/sampling.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,3 +100,8 @@ func (m *samplingStage) randomNumber() uint64 {
func (m *samplingStage) Name() string {
return StageTypeSampling
}

// Cleanup implements Stage.
func (*samplingStage) Cleanup() {
// no-op
}
6 changes: 6 additions & 0 deletions internal/component/loki/process/stages/stage.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ type Entry struct {
type Stage interface {
Name() string
Run(chan Entry) chan Entry
Cleanup()
}

func (entry *Entry) copy() *Entry {
Expand Down Expand Up @@ -243,3 +244,8 @@ func New(logger log.Logger, jobName *string, cfg StageConfig, registerer prometh
}
return s, nil
}

// Cleanup implements Stage.
func (*stageProcessor) Cleanup() {
// no-op
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,11 @@ func (s *structuredMetadataStage) Name() string {
return StageTypeStructuredMetadata
}

// Cleanup implements Stage.
func (*structuredMetadataStage) Cleanup() {
// no-op
}

func (s *structuredMetadataStage) Run(in chan Entry) chan Entry {
return RunWith(in, func(e Entry) Entry {
processLabelsConfigs(s.logger, e.Extracted, s.cfgs, func(labelName model.LabelName, labelValue model.LabelValue) {
Expand Down
Loading